Merge remote-tracking branch 'upstream/master' into usage-stats-export-wf-v2

This commit is contained in:
dimitrispie 2021-06-22 09:32:46 +03:00
commit bb2ef1b74d
24 changed files with 4367 additions and 424 deletions

View File

@ -115,6 +115,8 @@ public class AuthorMerger {
}
public static String pidToComparableString(StructuredProperty pid) {
if (pid == null)
return "";
return (pid.getQualifier() != null
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
: "")

View File

@ -7,6 +7,37 @@
<version>1.2.4-SNAPSHOT</version>
</parent>
<artifactId>dhp-aggregation</artifactId>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
@ -24,12 +55,6 @@
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -37,6 +62,13 @@
<artifactId>dhp-schemas</artifactId>
</dependency>
<dependency>
<groupId>eu.dnetlib.dhp</groupId>
<artifactId>dhp-graph-mapper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>net.sf.saxon</groupId>

View File

@ -0,0 +1,544 @@
package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, KeyValue, Oaf, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse
import java.nio.charset.CodingErrorAction
import java.text.SimpleDateFormat
import java.time.LocalDate
import java.time.chrono.ThaiBuddhistDate
import java.time.format.DateTimeFormatter
import java.util.{Date, Locale}
import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.io.{Codec, Source}
case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {}
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
case class DateType(date: Option[String], dateType: Option[String]) {}
case class HostedByMapType(openaire_id: String, datacite_name: String, official_name: String, similarity: Option[Float]) {}
object DataciteToOAFTransformation {
val UNKNOWN_REPOSITORY_ORIGINALID = "openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18"
val DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254"
val DNET_DATACITE_DATE = "dnet:dataCite_date"
val DNET_DATACITE_TITLE = "dnet:dataCite_title"
val SYSIMPORT_ACTIONSET = "sysimport:actionset"
val DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions"
val PROVENANCE_ACTION_SET_QUALIFIER: Qualifier = OafMapperUtils.qualifier(SYSIMPORT_ACTIONSET, SYSIMPORT_ACTIONSET, DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS)
val MAIN_TITLE_QUALIFIER:Qualifier = OafMapperUtils.qualifier("main title","main title",DNET_DATACITE_TITLE,DNET_DATACITE_TITLE)
implicit val codec: Codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
val DOI_CLASS = "doi"
val SUBJ_CLASS = "keywords"
val j_filter: List[String] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
s.lines.toList
}
val mapper = new ObjectMapper()
val unknown_repository: HostedByMapType = HostedByMapType(UNKNOWN_REPOSITORY_ORIGINALID, ModelConstants.UNKNOWN_REPOSITORY.getValue, ModelConstants.UNKNOWN_REPOSITORY.getValue, Some(1.0F))
val dataInfo: DataInfo = generateDataInfo("0.9")
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(DATACITE_ID, "Datacite")
val hostedByMap: Map[String, HostedByMapType] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(s)
json.extract[Map[String, HostedByMapType]]
}
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
val funder_regex: List[(Pattern, String)] = List(
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"),
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::")
)
val Date_regex: List[Pattern] = List(
//Y-M-D
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
//M-D-Y
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
//D-M-Y
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
//Y
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
)
def filter_json(json: String): Boolean = {
j_filter.exists(f => json.contains(f))
}
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: OafDataset =>
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
a.setClazz(classOf[OafDataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case software: Software =>
val a: AtomicAction[Software] = new AtomicAction[Software]
a.setClazz(classOf[Software])
a.setPayload(software)
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
case orp: OtherResearchProduct =>
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
a.setClazz(classOf[OtherResearchProduct])
a.setPayload(orp)
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def embargo_end(embargo_end_date: String): Boolean = {
val dt = LocalDate.parse(embargo_end_date, DateTimeFormatter.ofPattern("[yyyy-MM-dd]"))
val td = LocalDate.now()
td.isAfter(dt)
}
def extract_date(input: String): Option[String] = {
val d = Date_regex.map(pattern => {
val matcher = pattern.matcher(input)
if (matcher.find())
matcher.group(0)
else
null
}
).find(s => s != null)
if (d.isDefined) {
val a_date = if (d.get.length == 4) s"01-01-${d.get}" else d.get
try {
return Some(LocalDate.parse(a_date, df_en).toString)
} catch {
case _: Throwable => try {
return Some(LocalDate.parse(a_date, df_it).toString)
} catch {
case _: Throwable =>
return None
}
}
}
d
}
def fix_thai_date(input:String, format:String) :String = {
try {
val a_date = LocalDate.parse(input,DateTimeFormatter.ofPattern(format))
val d = ThaiBuddhistDate.of(a_date.getYear, a_date.getMonth.getValue, a_date.getDayOfMonth)
LocalDate.from(d).toString
} catch {
case _: Throwable => ""
}
}
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = {
if (resourceType != null && resourceType.nonEmpty) {
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
if (typeQualifier != null)
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
}
if (schemaOrg != null && schemaOrg.nonEmpty) {
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, schemaOrg)
if (typeQualifier != null)
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
}
if (resourceTypeGeneral != null && resourceTypeGeneral.nonEmpty) {
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceTypeGeneral)
if (typeQualifier != null)
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
}
null
}
def getResult(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): Result = {
val typeQualifiers: (Qualifier, Qualifier) = getTypeQualifier(resourceType, resourceTypeGeneral, schemaOrg, vocabularies)
if (typeQualifiers == null)
return null
val i = new Instance
i.setInstancetype(typeQualifiers._1)
typeQualifiers._2.getClassname match {
case "dataset" =>
val r = new OafDataset
r.setInstance(List(i).asJava)
return r
case "publication" =>
val r = new Publication
r.setInstance(List(i).asJava)
return r
case "software" =>
val r = new Software
r.setInstance(List(i).asJava)
return r
case "other" =>
val r = new OtherResearchProduct
r.setInstance(List(i).asJava)
return r
}
null
}
def available_date(input: String): Boolean = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input)
val l: List[String] = for {
JObject(dates) <- json \\ "dates"
JField("dateType", JString(dateTypes)) <- dates
} yield dateTypes
l.exists(p => p.equalsIgnoreCase("available"))
}
def OPEN_ACCESS_RIGHT = {
val result = new Qualifier
result.setClassid("OPEN")
result.setClassid("OPEN")
result.setSchemeid(ModelConstants.DNET_ACCESS_MODES)
result.setSchemename(ModelConstants.DNET_ACCESS_MODES)
result
}
/**
* As describe in ticket #6377
* when the result come from figshare we need to remove subject
* and set Access rights OPEN.
* @param r
*/
def fix_figshare(r: Result): Unit = {
if (r.getInstance() != null) {
val hosted_by_figshare = r.getInstance().asScala.exists(i => i.getHostedby != null && "figshare".equalsIgnoreCase(i.getHostedby.getValue))
if (hosted_by_figshare) {
r.getInstance().asScala.foreach(i => i.setAccessright(OPEN_ACCESS_RIGHT))
val l: List[StructuredProperty] = List()
r.setSubject(l.asJava)
}
}
}
def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = {
OafMapperUtils.structuredProperty(dt, q, null)
}
def generateRelation(sourceId: String, targetId: String, relClass: String, cf: KeyValue, di: DataInfo): Relation = {
val r = new Relation
r.setSource(sourceId)
r.setTarget(targetId)
r.setRelType(ModelConstants.RESULT_PROJECT)
r.setRelClass(relClass)
r.setSubRelType(ModelConstants.OUTCOME)
r.setCollectedfrom(List(cf).asJava)
r.setDataInfo(di)
r
}
def get_projectRelation(awardUri: String, sourceId: String): List[Relation] = {
val match_pattern = funder_regex.find(s => s._1.matcher(awardUri).find())
if (match_pattern.isDefined) {
val m = match_pattern.get._1
val p = match_pattern.get._2
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List(
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo),
generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
)
}
else
List()
}
def generateOAF(input: String, ts: Long, dateOfCollection: Long, vocabularies: VocabularyGroup): List[Oaf] = {
if (filter_json(input))
return List()
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json = parse(input)
val resourceType = (json \ "attributes" \ "types" \ "resourceType").extractOrElse[String](null)
val resourceTypeGeneral = (json \ "attributes" \ "types" \ "resourceTypeGeneral").extractOrElse[String](null)
val schemaOrg = (json \ "attributes" \ "types" \ "schemaOrg").extractOrElse[String](null)
val doi = (json \ "attributes" \ "doi").extract[String]
if (doi.isEmpty)
return List()
//Mapping type based on vocabularies dnet:publication_resource and dnet:result_typologies
val result = getResult(resourceType, resourceTypeGeneral, schemaOrg, vocabularies)
if (result == null)
return List()
val doi_q = OafMapperUtils.qualifier("doi", "doi", ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES)
val pid = OafMapperUtils.structuredProperty(doi, doi_q, dataInfo)
result.setPid(List(pid).asJava)
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
result.setOriginalId(List(doi).asJava)
val d = new Date(dateOfCollection * 1000)
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
result.setDateofcollection(ISO8601FORMAT.format(d))
result.setDateoftransformation(ISO8601FORMAT.format(ts))
result.setDataInfo(dataInfo)
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
val authors = creators.zipWithIndex.map { case (c, idx) =>
val a = new Author
a.setFullname(c.name.orNull)
a.setName(c.givenName.orNull)
a.setSurname(c.familyName.orNull)
if (c.nameIdentifiers != null && c.nameIdentifiers.isDefined && c.nameIdentifiers.get != null) {
a.setPid(c.nameIdentifiers.get.map(ni => {
val q = if (ni.nameIdentifierScheme.isDefined) vocabularies.getTermAsQualifier(ModelConstants.DNET_PID_TYPES, ni.nameIdentifierScheme.get.toLowerCase()) else null
if (ni.nameIdentifier != null && ni.nameIdentifier.isDefined) {
OafMapperUtils.structuredProperty(ni.nameIdentifier.get, q, dataInfo)
}
else
null
}
)
.asJava)
}
if (c.affiliation.isDefined)
a.setAffiliation(c.affiliation.get.filter(af => af.nonEmpty).map(af => OafMapperUtils.field(af, dataInfo)).asJava)
a.setRank(idx + 1)
a
}
val titles: List[TitleType] = (json \\ "titles").extractOrElse[List[TitleType]](List())
result.setTitle(titles.filter(t => t.title.nonEmpty).map(t => {
if (t.titleType.isEmpty) {
OafMapperUtils.structuredProperty(t.title.get, MAIN_TITLE_QUALIFIER, null)
} else {
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, DNET_DATACITE_TITLE, DNET_DATACITE_TITLE, null)
}
}).asJava)
if (authors == null || authors.isEmpty || !authors.exists(a => a != null))
return List()
result.setAuthor(authors.asJava)
val dates = (json \\ "dates").extract[List[DateType]]
val publication_year = (json \\ "publicationYear").extractOrElse[String](null)
val i_date = dates
.filter(d => d.date.isDefined && d.dateType.isDefined)
.find(d => d.dateType.get.equalsIgnoreCase("issued"))
.map(d => extract_date(d.date.get))
val a_date: Option[String] = dates
.filter(d => d.date.isDefined && d.dateType.isDefined && d.dateType.get.equalsIgnoreCase("available"))
.map(d => extract_date(d.date.get))
.find(d => d != null && d.isDefined)
.map(d => d.get)
if (a_date.isDefined) {
if(doi.startsWith("10.14457"))
result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get,"[yyyy-MM-dd]"), null))
else
result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null))
}
if (i_date.isDefined && i_date.get.isDefined) {
if(doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null))
}
else {
result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
}
}
else if (publication_year != null) {
if(doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null))
} else {
result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
}
}
result.setRelevantdate(dates.filter(d => d.date.isDefined && d.dateType.isDefined)
.map(d => (extract_date(d.date.get), d.dateType.get))
.filter(d => d._1.isDefined)
.map(d => (d._1.get, vocabularies.getTermAsQualifier(DNET_DATACITE_DATE, d._2.toLowerCase())))
.filter(d => d._2 != null)
.map(d => generateOAFDate(d._1, d._2)).asJava)
val subjects = (json \\ "subjects").extract[List[SubjectType]]
result.setSubject(subjects.filter(s => s.subject.nonEmpty)
.map(s =>
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
).asJava)
result.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
val descriptions = (json \\ "descriptions").extract[List[DescriptionType]]
result.setDescription(
descriptions
.filter(d => d.description.isDefined).
map(d =>
OafMapperUtils.field(d.description.get, null)
).filter(s => s != null).asJava)
val publisher = (json \\ "publisher").extractOrElse[String](null)
if (publisher != null)
result.setPublisher(OafMapperUtils.field(publisher, null))
val language: String = (json \\ "language").extractOrElse[String](null)
if (language != null)
result.setLanguage(vocabularies.getSynonymAsQualifier(ModelConstants.DNET_LANGUAGES, language))
val instance = result.getInstance().get(0)
val client = (json \ "relationships" \ "client" \\ "id").extractOpt[String]
val accessRights: List[String] = for {
JObject(rightsList) <- json \\ "rightsList"
JField("rightsUri", JString(rightsUri)) <- rightsList
} yield rightsUri
val aRights: Option[Qualifier] = accessRights.map(r => {
vocabularies.getSynonymAsQualifier(ModelConstants.DNET_ACCESS_MODES, r)
}).find(q => q != null).map(q => {
val a = new Qualifier
a.setClassid(q.getClassid)
a.setClassname(q.getClassname)
a.setSchemeid(q.getSchemeid)
a.setSchemename(q.getSchemename)
a
})
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.qualifier(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
if (client.isDefined) {
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
instance.setAccessright(access_rights_qualifier)
val license = accessRights
.find(r => r.startsWith("http") && r.matches(".*(/licenses|/publicdomain|unlicense\\.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*"))
if (license.isDefined)
instance.setLicense(OafMapperUtils.field(license.get, null))
}
val awardUris: List[String] = for {
JObject(fundingReferences) <- json \\ "fundingReferences"
JField("awardUri", JString(awardUri)) <- fundingReferences
} yield awardUri
val relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null)
fix_figshare(result)
if (relations != null && relations.nonEmpty) {
List(result) ::: relations
}
else
List(result)
}
def generateDataInfo(trust: String): DataInfo = {
val di = new DataInfo
di.setDeletedbyinference(false)
di.setInferred(false)
di.setInvisible(false)
di.setTrust(trust)
di.setProvenanceaction(PROVENANCE_ACTION_SET_QUALIFIER)
di
}
def generateDSId(input: String): String = {
val b = StringUtils.substringBefore(input, "::")
val a = StringUtils.substringAfter(input, "::")
s"10|$b::${DHPUtils.md5(a)}"
}
}

View File

@ -0,0 +1,40 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object ExportActionSetJobNode {
val log: Logger = LoggerFactory.getLogger(ExportActionSetJobNode.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/exportDataset_parameters.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(ExportActionSetJobNode.getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder:Encoder[(String,String)] = Encoders.tuple(Encoders.STRING,Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o =>DataciteToOAFTransformation.toActionSet(o))
.filter(o => o!= null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object GenerateDataciteDatasetSpark {
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val isLookupUrl: String = parser.get("isLookupUrl")
log.info("isLookupUrl: {}", isLookupUrl)
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
.master(master)
.getOrCreate()
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
import spark.implicits._
spark.read.load(sourcePath).as[DataciteType]
.filter(d => d.isActive)
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies))
.filter(d => d != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@ -33,7 +32,6 @@ public class PrepareProjects {
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils

View File

@ -0,0 +1,28 @@
TUBYDI - Assistir Filmes e Series Online Grátis
123Movies
WATCH FULL MOVIE
Movierulz
Full Movie Online
MOVIé WatcH
The King of Staten Island 2020 Online For Free
Watch Train to Busan 2 2020 online for free
Sixth Sense Movie Novelization
Film Complet streaming vf gratuit en ligne
watch now free
LIVE stream watch
LIVE stream UFC
RBC Heritage live stream
MLBStreams Free
NFL Live Stream
Live Stream Free
Royal Ascot 2020 Live Stream
TV Shows Full Episodes Official
FuboTV
Gomovies
Online Free Trial Access
123watch
DÜŞÜK HAPI
Bebek Düşürme Yöntemleri
WHATSAP İLETİŞİM
Cytotec
düşük hapı

View File

@ -0,0 +1,21 @@
[
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source mdstore path",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "the target mdstore path",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
}
]

View File

@ -0,0 +1,26 @@
[
{
"paramName": "s",
"paramLongName": "sourcePath",
"paramDescription": "the source mdstore path",
"paramRequired": true
},
{
"paramName": "t",
"paramLongName": "targetPath",
"paramDescription": "the target mdstore path",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
},
{
"paramName": "i",
"paramLongName": "isLookupUrl",
"paramDescription": "the isLookup URL",
"paramRequired": true
}
]

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,46 @@
<workflow-app name="Import_Datacite_and_transform_to_OAF" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
</parameters>
<start to="TransformJob"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="TransformJob">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>TransformJob</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.GenerateDataciteDatasetSpark</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--targetPath</arg><arg>${mainPath}/production/datacite_oaf</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

View File

@ -0,0 +1,46 @@
<workflow-app name="Datacite_to_ActionSet_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>outputPath</name>
<description>the path of Datacite ActionSet</description>
</property>
</parameters>
<start to="ExportDataset"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExportDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExportDataset</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,50 @@
package eu.dentlib.dhp.aggregation;
import static org.mockito.Mockito.lenient;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.mockito.Mock;
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
public abstract class AbstractVocabularyTest {
@Mock
protected ISLookUpService isLookUpService;
protected VocabularyGroup vocabularies;
public void setUpVocabulary() throws ISLookUpException, IOException {
lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs());
lenient()
.when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY))
.thenReturn(synonyms());
vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService);
}
private static List<String> vocs() throws IOException {
return IOUtils
.readLines(
Objects
.requireNonNull(
AbstractVocabularyTest.class.getResourceAsStream("/eu/dnetlib/dhp/vocabulary/terms.txt")));
}
private static List<String> synonyms() throws IOException {
return IOUtils
.readLines(
Objects
.requireNonNull(
AbstractVocabularyTest.class.getResourceAsStream("/eu/dnetlib/dhp/vocabulary/synonyms.txt")));
}
}

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.actionmanager.datacite
import eu.dentlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import org.codehaus.jackson.map.ObjectMapper
import scala.io.Source
@ExtendWith(Array(classOf[MockitoExtension]))
class DataciteToOAFTest extends AbstractVocabularyTest{
@BeforeEach
def setUp() :Unit = {
super.setUpVocabulary()
}
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("datacite.json")).mkString
val mapper = new ObjectMapper()
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies )
println (mapper.defaultPrettyPrintingWriter().writeValueAsString(res.head))
}
@Test
def testDate():Unit = {
println(DataciteToOAFTransformation.fix_thai_date("01-01-2561","[dd-MM-yyyy]"))
println(DataciteToOAFTransformation.fix_thai_date("2561-01-01","[yyyy-MM-dd]"))
}
}

File diff suppressed because one or more lines are too long

View File

@ -18,7 +18,7 @@ import eu.dnetlib.dhp.schema.oaf.Field;
public class DatePicker {
private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
private static final String DATE_PATTERN = "^(\\d{4})-(\\d{2})-(\\d{2})";
private static final String DATE_DEFAULT_SUFFIX = "01-01";
private static final int YEAR_LB = 1300;
private static final int YEAR_UB = Year.now().getValue() + 5;
@ -28,6 +28,7 @@ public class DatePicker {
final Map<String, Integer> frequencies = dateofacceptance
.parallelStream()
.filter(StringUtils::isNotBlank)
.map(d -> substringBefore(d, "T"))
.collect(Collectors.toConcurrentMap(w -> w, w -> 1, Integer::sum));
if (frequencies.isEmpty()) {

View File

@ -0,0 +1,44 @@
package eu.dnetlib.dhp.oa.dedup;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Collection;
import org.junit.jupiter.api.Test;
import com.clearspring.analytics.util.Lists;
public class DatePickerTest {
Collection<String> dates = Lists.newArrayList();
@Test
public void testPickISO() {
dates.add("2016-01-01T12:00:00Z");
dates.add("2016-06-16T12:00:00Z");
dates.add("2020-01-01T12:00:00Z");
dates.add("2020-10-01T12:00:00Z");
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
}
@Test
public void testPickSimple() {
dates.add("2016-01-01");
dates.add("2016-06-16");
dates.add("2020-01-01");
dates.add("2020-10-01");
assertEquals("2020-10-01", DatePicker.pick(dates).getValue());
}
@Test
public void testPickFrequent() {
dates.add("2016-02-01");
dates.add("2016-02-01");
dates.add("2016-02-01");
dates.add("2020-10-01");
assertEquals("2016-02-01", DatePicker.pick(dates).getValue());
}
}

View File

@ -33,9 +33,9 @@ object SparkMapDumpIntoOAF {
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderRelatons: Encoder[Relation] = Encoders.kryo[Relation]
implicit val mapEncoderDatasets: Encoder[oaf.Dataset] = Encoders.kryo[OafDataset]
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
val targetPath = parser.get("targetPath")
import spark.implicits._
spark.read.load(parser.get("sourcePath")).as[CrossrefDT]
.flatMap(k => Crossref2Oaf.convert(k.json))

View File

@ -1,42 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

View File

@ -1,372 +0,0 @@
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorIntersectionMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<!-- Itersection Parameters -->
<property>
<name>workingPath</name>
<description>the working Path</description>
</property>
<property>
<name>hostedByMapPath</name>
<description>the hostedByMap Path</description>
</property>
<property>
<name>outputPath</name>
<description>the Path of the sequence file action set</description>
</property>
<!-- Crossref Parameters -->
<property>
<name>inputPathCrossref</name>
<description>the Crossref input path</description>
</property>
<property>
<name>crossrefTimestamp</name>
<description>Timestamp for the Crossref incremental Harvesting</description>
</property>
<property>
<name>esServer</name>
<description>elasticsearch server url for the Crossref Harvesting</description>
</property>
<property>
<name>esIndex</name>
<description>elasticsearch index name for the Crossref Harvesting</description>
</property>
<!-- MAG Parameters -->
<property>
<name>MAGDumpPath</name>
<description>the MAG dump working path</description>
</property>
<property>
<name>inputPathMAG</name>
<description>the MAG working path</description>
</property>
<!-- UnpayWall Parameters -->
<property>
<name>inputPathUnpayWall</name>
<description>the UnpayWall working path</description>
</property>
<!-- ORCID Parameters -->
<property>
<name>inputPathOrcid</name>
<description>the ORCID input path</description>
</property>
<property>
<name>workingPathOrcid</name>
<description>the ORCID working path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resume_from"/>
<decision name="resume_from">
<switch>
<case to="ConvertCrossrefToOAF">${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'}</case>
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
<case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case>
<case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case>
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
<default to="ImportCrossRef"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportCrossRef">
<java>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--esServer</arg><arg>${esServer}</arg>
<arg>--esIndex</arg><arg>${esIndex}</arg>
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
</java>
<ok to="GenerateCrossrefDataset"/>
<error to="Kill"/>
</action>
<!-- CROSSREF SECTION -->
<action name="GenerateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="RenameDataset"/>
<error to="Kill"/>
</action>
<action name="RenameDataset">
<fs>
<delete path="${inputPathCrossref}/crossref_ds"/>
<move source="${inputPathCrossref}/crossref_ds_updated"
target="${inputPathCrossref}/crossref_ds"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ConvertCrossrefToOAF"/>
<error to="Kill"/>
</action>
<action name="ConvertCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ConvertCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessMAG"/>
<error to="Kill"/>
</action>
<action name="ProcessMAG">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to OAF Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessUW"/>
<error to="Kill"/>
</action>
<!-- UnpayWall SECTION -->
<action name="ProcessUW">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert UnpayWall to Dataset</name>
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
<arg>--targetPath</arg><arg>${workingPath}/uwPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessORCID"/>
<error to="Kill"/>
</action>
<!-- ORCID SECTION -->
<action name="ProcessORCID">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert ORCID to Dataset</name>
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="CreateDOIBoost"/>
<error to="Kill"/>
</action>
<!-- INTERSECTION SECTION-->
<action name="CreateDOIBoost">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create DOIBoost Infospace</name>
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--affiliationPath</arg><arg>${inputPathMAG}/dataset/Affiliations</arg>
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/dataset/PaperAuthorAffiliations</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="GenerateActionSet"/>
<error to="Kill"/>
</action>
<action name="GenerateActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Generate DOIBoost ActionSet</name>
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>${workingPath}/crossrefRelation</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingPath}/doiBoostPublicationAffiliation</arg>
<arg>--dbOrganizationPath</arg><arg>${workingPath}/doiBoostOrganization</arg>
<arg>--targetPath</arg><arg>${workingPath}/actionDataSet</arg>
<arg>--sFilePath</arg><arg>${outputPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>