forked from antonis.lempesis/dnet-hadoop
Merged Datacite transfrom into this branch
This commit is contained in:
parent
2da8bf7429
commit
99cf3a8ea4
|
@ -15,11 +15,11 @@ object OafUtils {
|
|||
}
|
||||
|
||||
|
||||
def generateDataInfo(trust: String = "0.9", invisibile: Boolean = false): DataInfo = {
|
||||
def generateDataInfo(trust: String = "0.9", invisible: Boolean = false): DataInfo = {
|
||||
val di = new DataInfo
|
||||
di.setDeletedbyinference(false)
|
||||
di.setInferred(false)
|
||||
di.setInvisible(false)
|
||||
di.setInvisible(invisible)
|
||||
di.setTrust(trust)
|
||||
di.setProvenanceaction(createQualifier("sysimport:actionset", "dnet:provenanceActions"))
|
||||
di
|
||||
|
|
|
@ -7,10 +7,44 @@
|
|||
<version>1.2.4-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>dhp-aggregation</artifactId>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>net.alchim31.maven</groupId>
|
||||
<artifactId>scala-maven-plugin</artifactId>
|
||||
<version>${net.alchim31.maven.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>scala-compile-first</id>
|
||||
<phase>initialize</phase>
|
||||
<goals>
|
||||
<goal>add-source</goal>
|
||||
<goal>compile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>scala-test-compile</id>
|
||||
<phase>process-test-resources</phase>
|
||||
<goals>
|
||||
<goal>testCompile</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<scalaVersion>${scala.version}</scalaVersion>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-core_2.11</artifactId>
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase, HttpUriRequest}
|
||||
import org.apache.http.entity.StringEntity
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
|
||||
import java.io.IOException
|
||||
|
||||
abstract class AbstractRestClient extends Iterator[String]{
|
||||
|
||||
var buffer: List[String] = List()
|
||||
var current_index:Int = 0
|
||||
|
||||
var scroll_value: Option[String] = None
|
||||
|
||||
var complete:Boolean = false
|
||||
|
||||
|
||||
def extractInfo(input: String): Unit
|
||||
|
||||
protected def getBufferData(): Unit
|
||||
|
||||
|
||||
def doHTTPGETRequest(url:String): String = {
|
||||
val httpGet = new HttpGet(url)
|
||||
doHTTPRequest(httpGet)
|
||||
|
||||
}
|
||||
|
||||
def doHTTPPOSTRequest(url:String, json:String): String = {
|
||||
val httpPost = new HttpPost(url)
|
||||
if (json != null) {
|
||||
val entity = new StringEntity(json)
|
||||
httpPost.setEntity(entity)
|
||||
httpPost.setHeader("Accept", "application/json")
|
||||
httpPost.setHeader("Content-type", "application/json")
|
||||
}
|
||||
doHTTPRequest(httpPost)
|
||||
}
|
||||
|
||||
def hasNext: Boolean = {
|
||||
buffer.nonEmpty && current_index < buffer.size
|
||||
}
|
||||
|
||||
|
||||
override def next(): String = {
|
||||
val next_item:String = buffer(current_index)
|
||||
current_index = current_index + 1
|
||||
if (current_index == buffer.size)
|
||||
getBufferData()
|
||||
next_item
|
||||
}
|
||||
|
||||
|
||||
private def doHTTPRequest[A <: HttpUriRequest](r: A) :String ={
|
||||
val client = HttpClients.createDefault
|
||||
try {
|
||||
val response = client.execute(r)
|
||||
IOUtils.toString(response.getEntity.getContent)
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
throw new RuntimeException("Error on executing request ", e)
|
||||
} finally try client.close()
|
||||
catch {
|
||||
case e: IOException =>
|
||||
throw new RuntimeException("Unable to close client ", e)
|
||||
}
|
||||
}
|
||||
|
||||
getBufferData()
|
||||
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import org.json4s.{DefaultFormats, JValue}
|
||||
import org.json4s.jackson.JsonMethods.{compact, parse, render}
|
||||
|
||||
class DataciteAPIImporter(timestamp: Long = 0, blocks: Long = 10) extends AbstractRestClient {
|
||||
|
||||
override def extractInfo(input: String): Unit = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(input)
|
||||
buffer = (json \ "data").extract[List[JValue]].map(s => compact(render(s)))
|
||||
val next_url = (json \ "links" \ "next").extractOrElse[String](null)
|
||||
scroll_value = if (next_url != null && next_url.nonEmpty) Some(next_url) else None
|
||||
if (scroll_value.isEmpty)
|
||||
complete = true
|
||||
current_index = 0
|
||||
}
|
||||
|
||||
override def getBufferData(): Unit = {
|
||||
if (!complete) {
|
||||
val response = if (scroll_value.isDefined) doHTTPGETRequest(scroll_value.get) else doHTTPGETRequest(s"https://api.datacite.org/dois?page[cursor]=1&page[size]=$blocks&query=updated:[$timestamp%20TO%20*]")
|
||||
extractInfo(response)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,475 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
|
||||
import eu.dnetlib.dhp.utils.DHPUtils
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
||||
import java.nio.charset.CodingErrorAction
|
||||
import java.time.LocalDate
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.Locale
|
||||
import java.util.regex.Pattern
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.io.{Codec, Source}
|
||||
|
||||
|
||||
|
||||
case class DataciteType(doi:String,timestamp:Long,isActive:Boolean, json:String ){}
|
||||
|
||||
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
|
||||
|
||||
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
|
||||
|
||||
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
|
||||
|
||||
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
|
||||
|
||||
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
|
||||
|
||||
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
|
||||
|
||||
case class DateType(date: Option[String], dateType: Option[String]) {}
|
||||
|
||||
case class HostedByMapType(openaire_id: String, datacite_name: String, official_name: String, similarity: Option[Float]) {}
|
||||
|
||||
object DataciteToOAFTransformation {
|
||||
|
||||
implicit val codec: Codec = Codec("UTF-8")
|
||||
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
||||
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
||||
|
||||
private val PID_VOCABULARY = "dnet:pid_types"
|
||||
val COBJ_VOCABULARY = "dnet:publication_resource"
|
||||
val RESULT_VOCABULARY = "dnet:result_typologies"
|
||||
val ACCESS_MODE_VOCABULARY = "dnet:access_modes"
|
||||
val DOI_CLASS = "doi"
|
||||
|
||||
val TITLE_SCHEME = "dnet:dataCite_title"
|
||||
val SUBJ_CLASS = "keywords"
|
||||
val SUBJ_SCHEME = "dnet:subject_classification_typologies"
|
||||
|
||||
val j_filter:List[String] = {
|
||||
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
|
||||
s.lines.toList
|
||||
}
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
val unknown_repository: HostedByMapType = HostedByMapType("openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18", "Unknown Repository", "Unknown Repository", Some(1.0F))
|
||||
|
||||
val dataInfo: DataInfo = generateDataInfo("0.9")
|
||||
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("openaire____::datacite", "Datacite")
|
||||
|
||||
val hostedByMap: Map[String, HostedByMapType] = {
|
||||
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(s)
|
||||
json.extract[Map[String, HostedByMapType]]
|
||||
}
|
||||
|
||||
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
|
||||
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
|
||||
|
||||
val funder_regex:List[(Pattern, String)] = List(
|
||||
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE),"40|corda__h2020::"),
|
||||
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE),"40|corda_______::")
|
||||
|
||||
)
|
||||
|
||||
val Date_regex: List[Pattern] = List(
|
||||
//Y-M-D
|
||||
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
|
||||
//M-D-Y
|
||||
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
|
||||
//D-M-Y
|
||||
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
|
||||
//Y
|
||||
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
|
||||
)
|
||||
|
||||
|
||||
def filter_json(json:String):Boolean = {
|
||||
j_filter.exists(f => json.contains(f))
|
||||
}
|
||||
|
||||
def toActionSet(item:Oaf) :(String, String) = {
|
||||
val mapper = new ObjectMapper()
|
||||
|
||||
item match {
|
||||
case dataset: OafDataset =>
|
||||
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
|
||||
a.setClazz(classOf[OafDataset])
|
||||
a.setPayload(dataset)
|
||||
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case publication: Publication =>
|
||||
val a: AtomicAction[Publication] = new AtomicAction[Publication]
|
||||
a.setClazz(classOf[Publication])
|
||||
a.setPayload(publication)
|
||||
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case software: Software =>
|
||||
val a: AtomicAction[Software] = new AtomicAction[Software]
|
||||
a.setClazz(classOf[Software])
|
||||
a.setPayload(software)
|
||||
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case orp: OtherResearchProduct =>
|
||||
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
|
||||
a.setClazz(classOf[OtherResearchProduct])
|
||||
a.setPayload(orp)
|
||||
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
|
||||
case relation: Relation =>
|
||||
val a: AtomicAction[Relation] = new AtomicAction[Relation]
|
||||
a.setClazz(classOf[Relation])
|
||||
a.setPayload(relation)
|
||||
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case _ =>
|
||||
null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def embargo_end(embargo_end_date: String): Boolean = {
|
||||
val dt = LocalDate.parse(embargo_end_date, DateTimeFormatter.ofPattern("[yyyy-MM-dd]"))
|
||||
val td = LocalDate.now()
|
||||
td.isAfter(dt)
|
||||
}
|
||||
|
||||
|
||||
def extract_date(input: String): Option[String] = {
|
||||
val d = Date_regex.map(pattern => {
|
||||
val matcher = pattern.matcher(input)
|
||||
if (matcher.find())
|
||||
matcher.group(0)
|
||||
else
|
||||
null
|
||||
}
|
||||
).find(s => s != null)
|
||||
|
||||
if (d.isDefined) {
|
||||
val a_date = if (d.get.length == 4) s"01-01-${d.get}" else d.get
|
||||
try {
|
||||
return Some(LocalDate.parse(a_date, df_en).toString)
|
||||
} catch {
|
||||
case _: Throwable => try {
|
||||
return Some(LocalDate.parse(a_date, df_it).toString)
|
||||
} catch {
|
||||
case _: Throwable => try {
|
||||
return None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
d
|
||||
}
|
||||
|
||||
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies:VocabularyGroup): (Qualifier, Qualifier) = {
|
||||
if (resourceType != null && resourceType.nonEmpty) {
|
||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, resourceType)
|
||||
if (typeQualifier != null)
|
||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
||||
}
|
||||
if (schemaOrg != null && schemaOrg.nonEmpty) {
|
||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, schemaOrg)
|
||||
if (typeQualifier != null)
|
||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
||||
|
||||
}
|
||||
if (resourceTypeGeneral != null && resourceTypeGeneral.nonEmpty) {
|
||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, resourceTypeGeneral)
|
||||
if (typeQualifier != null)
|
||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
||||
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
|
||||
def getResult(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies:VocabularyGroup): Result = {
|
||||
val typeQualifiers: (Qualifier, Qualifier) = getTypeQualifier(resourceType, resourceTypeGeneral, schemaOrg, vocabularies)
|
||||
if (typeQualifiers == null)
|
||||
return null
|
||||
val i = new Instance
|
||||
i.setInstancetype(typeQualifiers._1)
|
||||
typeQualifiers._2.getClassname match {
|
||||
case "dataset" =>
|
||||
val r = new OafDataset
|
||||
r.setInstance(List(i).asJava)
|
||||
return r
|
||||
case "publication" =>
|
||||
val r = new Publication
|
||||
r.setInstance(List(i).asJava)
|
||||
return r
|
||||
case "software" =>
|
||||
val r = new Software
|
||||
r.setInstance(List(i).asJava)
|
||||
return r
|
||||
case "other" =>
|
||||
val r = new OtherResearchProduct
|
||||
r.setInstance(List(i).asJava)
|
||||
return r
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
|
||||
def available_date(input: String): Boolean = {
|
||||
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(input)
|
||||
val l: List[String] = for {
|
||||
JObject(dates) <- json \\ "dates"
|
||||
JField("dateType", JString(dateTypes)) <- dates
|
||||
} yield dateTypes
|
||||
|
||||
l.exists(p => p.equalsIgnoreCase("available"))
|
||||
|
||||
}
|
||||
|
||||
|
||||
def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = {
|
||||
OafMapperUtils.structuredProperty(dt, q, null)
|
||||
}
|
||||
|
||||
def generateRelation(sourceId:String, targetId:String, relClass:String, cf:KeyValue, di:DataInfo) :Relation = {
|
||||
|
||||
val r = new Relation
|
||||
r.setSource(sourceId)
|
||||
r.setTarget(targetId)
|
||||
r.setRelType("resultProject")
|
||||
r.setRelClass(relClass)
|
||||
r.setSubRelType("outcome")
|
||||
r.setCollectedfrom(List(cf).asJava)
|
||||
r.setDataInfo(di)
|
||||
r
|
||||
|
||||
|
||||
}
|
||||
|
||||
def get_projectRelation(awardUri:String, sourceId:String):List[Relation] = {
|
||||
val match_pattern = funder_regex.find(s =>s._1.matcher(awardUri).find())
|
||||
|
||||
if (match_pattern.isDefined) {
|
||||
val m =match_pattern.get._1
|
||||
val p = match_pattern.get._2
|
||||
val grantId = m.matcher(awardUri).replaceAll("$2")
|
||||
val targetId = s"$p${DHPUtils.md5(grantId)}"
|
||||
List(
|
||||
generateRelation(sourceId, targetId,"isProducedBy", DATACITE_COLLECTED_FROM, dataInfo),
|
||||
generateRelation(targetId, sourceId,"produces", DATACITE_COLLECTED_FROM, dataInfo)
|
||||
)
|
||||
}
|
||||
else
|
||||
List()
|
||||
|
||||
}
|
||||
|
||||
|
||||
def generateOAF(input:String,ts:Long, dateOfCollection:Long, vocabularies: VocabularyGroup):List[Oaf] = {
|
||||
if (filter_json(input))
|
||||
return List()
|
||||
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json = parse(input)
|
||||
|
||||
val resourceType = (json \ "attributes" \ "types" \ "resourceType").extractOrElse[String](null)
|
||||
val resourceTypeGeneral = (json \ "attributes" \ "types" \ "resourceTypeGeneral").extractOrElse[String](null)
|
||||
val schemaOrg = (json \ "attributes" \ "types" \ "schemaOrg").extractOrElse[String](null)
|
||||
|
||||
val doi = (json \ "attributes" \ "doi").extract[String]
|
||||
if (doi.isEmpty)
|
||||
return List()
|
||||
|
||||
//Mapping type based on vocabularies dnet:publication_resource and dnet:result_typologies
|
||||
val result = getResult(resourceType, resourceTypeGeneral, schemaOrg, vocabularies)
|
||||
if (result == null)
|
||||
return List()
|
||||
|
||||
|
||||
val doi_q = vocabularies.getSynonymAsQualifier(PID_VOCABULARY, "doi")
|
||||
val pid = OafMapperUtils.structuredProperty(doi, doi_q, dataInfo)
|
||||
result.setPid(List(pid).asJava)
|
||||
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
||||
result.setOriginalId(List(doi).asJava)
|
||||
result.setDateofcollection(s"${dateOfCollection}")
|
||||
result.setDateoftransformation(s"$ts")
|
||||
result.setDataInfo(dataInfo)
|
||||
|
||||
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
|
||||
|
||||
|
||||
val authors = creators.zipWithIndex.map { case (c, idx) =>
|
||||
val a = new Author
|
||||
a.setFullname(c.name.orNull)
|
||||
a.setName(c.givenName.orNull)
|
||||
a.setSurname(c.familyName.orNull)
|
||||
if (c.nameIdentifiers!= null&& c.nameIdentifiers.isDefined && c.nameIdentifiers.get != null) {
|
||||
a.setPid(c.nameIdentifiers.get.map(ni => {
|
||||
val q = if (ni.nameIdentifierScheme.isDefined) vocabularies.getTermAsQualifier(PID_VOCABULARY, ni.nameIdentifierScheme.get.toLowerCase()) else null
|
||||
if (ni.nameIdentifier!= null && ni.nameIdentifier.isDefined) {
|
||||
OafMapperUtils.structuredProperty(ni.nameIdentifier.get, q, dataInfo)
|
||||
}
|
||||
else
|
||||
null
|
||||
|
||||
}
|
||||
)
|
||||
.asJava)
|
||||
}
|
||||
if (c.affiliation.isDefined)
|
||||
a.setAffiliation(c.affiliation.get.filter(af => af.nonEmpty).map(af => OafMapperUtils.field(af, dataInfo)).asJava)
|
||||
a.setRank(idx + 1)
|
||||
a
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
val titles:List[TitleType] = (json \\ "titles").extractOrElse[List[TitleType]](List())
|
||||
|
||||
result.setTitle(titles.filter(t => t.title.nonEmpty).map(t => {
|
||||
if (t.titleType.isEmpty) {
|
||||
OafMapperUtils.structuredProperty(t.title.get, "main title", "main title", TITLE_SCHEME, TITLE_SCHEME, null)
|
||||
} else {
|
||||
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, TITLE_SCHEME, TITLE_SCHEME, null)
|
||||
}
|
||||
}).asJava)
|
||||
|
||||
if(authors==null || authors.isEmpty || !authors.exists(a => a !=null))
|
||||
return List()
|
||||
result.setAuthor(authors.asJava)
|
||||
|
||||
val dates = (json \\ "dates").extract[List[DateType]]
|
||||
val publication_year = (json \\ "publicationYear").extractOrElse[String](null)
|
||||
|
||||
val i_date = dates
|
||||
.filter(d => d.date.isDefined && d.dateType.isDefined)
|
||||
.find(d => d.dateType.get.equalsIgnoreCase("issued"))
|
||||
.map(d => extract_date(d.date.get))
|
||||
val a_date: Option[String] = dates
|
||||
.filter(d => d.date.isDefined && d.dateType.isDefined && d.dateType.get.equalsIgnoreCase("available"))
|
||||
.map(d => extract_date(d.date.get))
|
||||
.find(d => d != null && d.isDefined)
|
||||
.map(d => d.get)
|
||||
|
||||
if (a_date.isDefined) {
|
||||
result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null))
|
||||
}
|
||||
if (i_date.isDefined && i_date.get.isDefined) {
|
||||
result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
|
||||
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
|
||||
}
|
||||
else if (publication_year != null) {
|
||||
result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
|
||||
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
|
||||
}
|
||||
|
||||
|
||||
result.setRelevantdate(dates.filter(d => d.date.isDefined && d.dateType.isDefined)
|
||||
.map(d => (extract_date(d.date.get), d.dateType.get))
|
||||
.filter(d => d._1.isDefined)
|
||||
.map(d => (d._1.get, vocabularies.getTermAsQualifier("dnet:dataCite_date", d._2.toLowerCase())))
|
||||
.filter(d => d._2 != null)
|
||||
.map(d => generateOAFDate(d._1, d._2)).asJava)
|
||||
|
||||
val subjects = (json \\ "subjects").extract[List[SubjectType]]
|
||||
|
||||
result.setSubject(subjects.filter(s => s.subject.nonEmpty)
|
||||
.map(s =>
|
||||
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, SUBJ_SCHEME, SUBJ_SCHEME, null)
|
||||
).asJava)
|
||||
|
||||
|
||||
result.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
|
||||
|
||||
val descriptions = (json \\ "descriptions").extract[List[DescriptionType]]
|
||||
|
||||
result.setDescription(
|
||||
descriptions
|
||||
.filter(d => d.description.isDefined).
|
||||
map(d =>
|
||||
OafMapperUtils.field(d.description.get, null)
|
||||
).filter(s => s!=null).asJava)
|
||||
|
||||
|
||||
val publisher = (json \\ "publisher").extractOrElse[String](null)
|
||||
if (publisher != null)
|
||||
result.setPublisher(OafMapperUtils.field(publisher, null))
|
||||
|
||||
|
||||
val language: String = (json \\ "language").extractOrElse[String](null)
|
||||
|
||||
if (language != null)
|
||||
result.setLanguage(vocabularies.getSynonymAsQualifier("dnet:languages", language))
|
||||
|
||||
|
||||
val instance = result.getInstance().get(0)
|
||||
|
||||
val client = (json \ "relationships" \ "client" \\ "id").extractOpt[String]
|
||||
|
||||
val accessRights:List[String] = for {
|
||||
JObject(rightsList) <- json \\ "rightsList"
|
||||
JField("rightsUri", JString(rightsUri)) <- rightsList
|
||||
} yield rightsUri
|
||||
|
||||
val aRights: Option[Qualifier] = accessRights.map(r => {
|
||||
vocabularies.getSynonymAsQualifier(ACCESS_MODE_VOCABULARY, r)
|
||||
}).find(q => q != null)
|
||||
|
||||
|
||||
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.qualifier("UNKNOWN", "not available", ACCESS_MODE_VOCABULARY, ACCESS_MODE_VOCABULARY)
|
||||
|
||||
if (client.isDefined) {
|
||||
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
|
||||
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
|
||||
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
|
||||
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
|
||||
instance.setAccessright(access_rights_qualifier)
|
||||
|
||||
//'http') and matches(., '.*(/licenses|/publicdomain|unlicense.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*')]">
|
||||
val license = accessRights
|
||||
.find(r => r.startsWith("http") && r.matches(".*(/licenses|/publicdomain|unlicense\\.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*"))
|
||||
if (license.isDefined)
|
||||
instance.setLicense(OafMapperUtils.field(license.get, null))
|
||||
}
|
||||
|
||||
|
||||
val awardUris:List[String] = for {
|
||||
JObject(fundingReferences) <- json \\ "fundingReferences"
|
||||
JField("awardUri", JString(awardUri)) <- fundingReferences
|
||||
} yield awardUri
|
||||
|
||||
val relations:List[Relation] =awardUris.flatMap(a=> get_projectRelation(a, result.getId)).filter(r => r!= null)
|
||||
|
||||
if (relations!= null && relations.nonEmpty) {
|
||||
List(result):::relations
|
||||
}
|
||||
else
|
||||
List(result)
|
||||
}
|
||||
|
||||
def generateDataInfo(trust: String): DataInfo = {
|
||||
val di = new DataInfo
|
||||
di.setDeletedbyinference(false)
|
||||
di.setInferred(false)
|
||||
di.setInvisible(false)
|
||||
di.setTrust(trust)
|
||||
di.setProvenanceaction(OafMapperUtils.qualifier("sysimport:actionset", "sysimport:actionset", "dnet:provenanceActions", "dnet:provenanceActions"))
|
||||
di
|
||||
}
|
||||
|
||||
def generateDSId(input: String): String = {
|
||||
val b = StringUtils.substringBefore(input, "::")
|
||||
val a = StringUtils.substringAfter(input, "::")
|
||||
s"10|$b::${DHPUtils.md5(a)}"
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.io.compress.GzipCodec
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object ExportActionSetJobNode {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf = new SparkConf
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
val master = parser.get("master")
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
val targetPath = parser.get("targetPath")
|
||||
|
||||
val spark: SparkSession = SparkSession.builder().config(conf)
|
||||
.appName(ExportActionSetJobNode.getClass.getSimpleName)
|
||||
.master(master)
|
||||
.getOrCreate()
|
||||
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
|
||||
|
||||
spark.read.load(sourcePath).as[Oaf]
|
||||
.map(o =>DataciteToOAFTransformation.toActionSet(o))
|
||||
.filter(o => o!= null)
|
||||
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||
import eu.dnetlib.dhp.model.mdstore.MetadataRecord
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
object GenerateDataciteDatasetSpark {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf = new SparkConf
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
val master = parser.get("master")
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
val targetPath = parser.get("targetPath")
|
||||
val isLookupUrl: String = parser.get("isLookupUrl")
|
||||
log.info("isLookupUrl: {}", isLookupUrl)
|
||||
|
||||
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
||||
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
||||
|
||||
val spark: SparkSession = SparkSession.builder().config(conf)
|
||||
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
|
||||
.master(master)
|
||||
.getOrCreate()
|
||||
|
||||
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
|
||||
|
||||
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
spark.read.load(sourcePath).as[DataciteType]
|
||||
.filter(d => d.isActive)
|
||||
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies))
|
||||
.filter(d => d != null)
|
||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
package eu.dnetlib.dhp.actionmanager.datacite
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem
|
||||
import org.apache.hadoop.io.{IntWritable, SequenceFile, Text}
|
||||
import org.apache.spark.SparkContext
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.expressions.Aggregator
|
||||
import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
import org.apache.spark.sql.functions.max
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import java.time.format.DateTimeFormatter._
|
||||
import java.time.{LocalDateTime, ZoneOffset}
|
||||
import scala.io.Source
|
||||
|
||||
object ImportDatacite {
|
||||
|
||||
val log: Logger = LoggerFactory.getLogger(ImportDatacite.getClass)
|
||||
|
||||
|
||||
def convertAPIStringToDataciteItem(input:String): DataciteType = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: org.json4s.JValue = parse(input)
|
||||
val doi = (json \ "attributes" \ "doi").extract[String].toLowerCase
|
||||
|
||||
val isActive = (json \ "attributes" \ "isActive").extract[Boolean]
|
||||
|
||||
val timestamp_string = (json \ "attributes" \ "updated").extract[String]
|
||||
val dt = LocalDateTime.parse(timestamp_string, ISO_DATE_TIME)
|
||||
DataciteType(doi = doi, timestamp = dt.toInstant(ZoneOffset.UTC).toEpochMilli/1000, isActive = isActive, json = input)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
|
||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json")).mkString)
|
||||
parser.parseArgument(args)
|
||||
val master = parser.get("master")
|
||||
|
||||
val hdfsuri = parser.get("namenode")
|
||||
log.info(s"namenode is $hdfsuri")
|
||||
|
||||
val targetPath = parser.get("targetPath")
|
||||
log.info(s"targetPath is $targetPath")
|
||||
|
||||
val dataciteDump = parser.get("dataciteDumpPath")
|
||||
log.info(s"dataciteDump is $dataciteDump")
|
||||
|
||||
val hdfsTargetPath =new Path(targetPath)
|
||||
log.info(s"hdfsTargetPath is $hdfsTargetPath")
|
||||
|
||||
val spark: SparkSession = SparkSession.builder()
|
||||
.appName(ImportDatacite.getClass.getSimpleName)
|
||||
.master(master)
|
||||
.getOrCreate()
|
||||
|
||||
// ====== Init HDFS File System Object
|
||||
val conf = new Configuration
|
||||
// Set FileSystem URI
|
||||
conf.set("fs.defaultFS", hdfsuri)
|
||||
|
||||
// Because of Maven
|
||||
conf.set("fs.hdfs.impl", classOf[DistributedFileSystem].getName)
|
||||
conf.set("fs.file.impl", classOf[LocalFileSystem].getName)
|
||||
val sc:SparkContext = spark.sparkContext
|
||||
sc.setLogLevel("ERROR")
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
|
||||
val dataciteAggregator: Aggregator[DataciteType, DataciteType, DataciteType] = new Aggregator[DataciteType, DataciteType, DataciteType] with Serializable {
|
||||
|
||||
override def zero: DataciteType = null
|
||||
|
||||
override def reduce(a: DataciteType, b: DataciteType): DataciteType = {
|
||||
if (b == null)
|
||||
return a
|
||||
if (a == null)
|
||||
return b
|
||||
if(a.timestamp >b.timestamp) {
|
||||
return a
|
||||
}
|
||||
b
|
||||
}
|
||||
|
||||
override def merge(a: DataciteType, b: DataciteType): DataciteType = {
|
||||
reduce(a,b)
|
||||
}
|
||||
|
||||
override def bufferEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
|
||||
|
||||
override def outputEncoder: Encoder[DataciteType] = implicitly[Encoder[DataciteType]]
|
||||
|
||||
override def finish(reduction: DataciteType): DataciteType = reduction
|
||||
}
|
||||
|
||||
val dump:Dataset[DataciteType] = spark.read.load(dataciteDump).as[DataciteType]
|
||||
val ts = dump.select(max("timestamp")).first().getLong(0)
|
||||
|
||||
log.info(s"last Timestamp is $ts")
|
||||
|
||||
val cnt = writeSequenceFile(hdfsTargetPath, ts, conf)
|
||||
|
||||
log.info(s"Imported from Datacite API $cnt documents")
|
||||
|
||||
if (cnt > 0) {
|
||||
|
||||
val inputRdd:RDD[DataciteType] = sc.sequenceFile(targetPath, classOf[Int], classOf[Text])
|
||||
.map(s => s._2.toString)
|
||||
.map(s => convertAPIStringToDataciteItem(s))
|
||||
spark.createDataset(inputRdd).write.mode(SaveMode.Overwrite).save(s"${targetPath}_dataset")
|
||||
|
||||
val ds:Dataset[DataciteType] = spark.read.load(s"${targetPath}_dataset").as[DataciteType]
|
||||
|
||||
dump
|
||||
.union(ds)
|
||||
.groupByKey(_.doi)
|
||||
.agg(dataciteAggregator.toColumn)
|
||||
.map(s=>s._2)
|
||||
.repartition(4000)
|
||||
.write.mode(SaveMode.Overwrite).save(s"${dataciteDump}_updated")
|
||||
|
||||
val fs = FileSystem.get(sc.hadoopConfiguration)
|
||||
fs.delete(new Path(s"$dataciteDump"), true)
|
||||
fs.rename(new Path(s"${dataciteDump}_updated"),new Path(s"$dataciteDump"))
|
||||
}
|
||||
}
|
||||
|
||||
private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration):Long = {
|
||||
val client = new DataciteAPIImporter(timestamp*1000, 1000)
|
||||
var i = 0
|
||||
try {
|
||||
val writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfsTargetPath), SequenceFile.Writer.keyClass(classOf[IntWritable]), SequenceFile.Writer.valueClass(classOf[Text]))
|
||||
try {
|
||||
|
||||
var start: Long = System.currentTimeMillis
|
||||
var end: Long = 0
|
||||
val key: IntWritable = new IntWritable(i)
|
||||
val value: Text = new Text
|
||||
while ( {
|
||||
client.hasNext
|
||||
}) {
|
||||
key.set({
|
||||
i += 1;
|
||||
i - 1
|
||||
})
|
||||
value.set(client.next())
|
||||
writer.append(key, value)
|
||||
writer.hflush()
|
||||
if (i % 1000 == 0) {
|
||||
end = System.currentTimeMillis
|
||||
val time = (end - start) / 1000.0F
|
||||
println(s"Imported $i in $time seconds")
|
||||
start = System.currentTimeMillis
|
||||
}
|
||||
}
|
||||
} finally if (writer != null) writer.close()
|
||||
}
|
||||
i
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
TUBYDI - Assistir Filmes e Series Online Grátis
|
||||
123Movies
|
||||
WATCH FULL MOVIE
|
||||
Movierulz
|
||||
Full Movie Online
|
||||
MOVIé WatcH
|
||||
The King of Staten Island 2020 Online For Free
|
||||
Watch Train to Busan 2 2020 online for free
|
||||
Sixth Sense Movie Novelization
|
||||
Film Complet streaming vf gratuit en ligne
|
||||
watch now free
|
||||
LIVE stream watch
|
||||
LIVE stream UFC
|
||||
RBC Heritage live stream
|
||||
MLBStreams Free
|
||||
NFL Live Stream
|
||||
Live Stream Free
|
||||
Royal Ascot 2020 Live Stream
|
||||
TV Shows Full Episodes Official
|
||||
FuboTV
|
||||
Gomovies
|
||||
Online Free Trial Access
|
||||
123watch
|
||||
DÜŞÜK HAPI
|
||||
Bebek Düşürme Yöntemleri
|
||||
WHATSAP İLETİŞİM
|
||||
Cytotec
|
||||
düşük hapı
|
|
@ -0,0 +1,21 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the source mdstore path",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the target mdstore path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "m",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "the master name",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,33 @@
|
|||
[
|
||||
{
|
||||
"paramName": "s",
|
||||
"paramLongName": "sourcePath",
|
||||
"paramDescription": "the source mdstore path",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the target mdstore path",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "tr",
|
||||
"paramLongName": "transformationRule",
|
||||
"paramDescription": "the transformation Rule",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "m",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "the master name",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "i",
|
||||
"paramLongName": "isLookupUrl",
|
||||
"paramDescription": "the isLookup URL",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,27 @@
|
|||
[
|
||||
{
|
||||
"paramName": "t",
|
||||
"paramLongName": "targetPath",
|
||||
"paramDescription": "the path of the sequencial file to write",
|
||||
"paramRequired": true
|
||||
},
|
||||
|
||||
{
|
||||
"paramName": "d",
|
||||
"paramLongName": "dataciteDumpPath",
|
||||
"paramDescription": "the path of the Datacite dump",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "n",
|
||||
"paramLongName": "namenode",
|
||||
"paramDescription": "the hive metastore uris",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "m",
|
||||
"paramLongName": "master",
|
||||
"paramDescription": "the master name",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,18 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -0,0 +1,103 @@
|
|||
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>mdstoreInputPath</name>
|
||||
<description>the path of the input MDStore</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mdstoreOutputPath</name>
|
||||
<description>the path of the cleaned mdstore</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>nativeInputPath</name>
|
||||
<description>the path of the input MDStore</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ImportDatacite"/>
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ImportDatacite">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ImportDatacite</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.datacite.ImportDatacite</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>-t</arg><arg>${nativeInputPath}</arg>
|
||||
<arg>-d</arg><arg>${mdstoreInputPath}</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="TransformJob"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="TransformJob">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>TransformJob</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${mdstoreInputPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${mdstoreOutputPath}</arg>
|
||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>-tr</arg><arg>${isLookupUrl}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="ExportDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ExportDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExportDataset</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${mdstoreOutputPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${mdstoreOutputPath}_raw_AS</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -111,12 +111,12 @@ object DoiBoostMappingUtil {
|
|||
result.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype))
|
||||
}
|
||||
result.getInstance().asScala.foreach(i => {
|
||||
i.setHostedby(getUbknownHostedBy())
|
||||
i.setHostedby(getUnknownHostedBy())
|
||||
})
|
||||
result
|
||||
}
|
||||
|
||||
def getUbknownHostedBy():KeyValue = {
|
||||
def getUnknownHostedBy():KeyValue = {
|
||||
val hb = new KeyValue
|
||||
hb.setValue("Unknown Repository")
|
||||
hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c")
|
||||
|
|
|
@ -224,7 +224,7 @@ object DLIToOAF {
|
|||
if (cleanedPids.isEmpty)
|
||||
return null
|
||||
result.setId(generateId(inputPublication.getId))
|
||||
result.setDataInfo(generateDataInfo(invisibile = true))
|
||||
result.setDataInfo(generateDataInfo(invisible = true))
|
||||
if (inputPublication.getCollectedfrom == null || inputPublication.getCollectedfrom.size() == 0 || (inputPublication.getCollectedfrom.size() == 1 && inputPublication.getCollectedfrom.get(0) == null))
|
||||
return null
|
||||
result.setCollectedfrom(inputPublication.getCollectedfrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
|
||||
|
|
Loading…
Reference in New Issue