forked from D-Net/dnet-hadoop
splitted workflow collecting datacite into two workflows.
Released on beta
This commit is contained in:
parent
27681b876c
commit
616d2ecce2
|
@ -89,6 +89,13 @@ public class ModelConstants {
|
||||||
public static final String UNKNOWN = "UNKNOWN";
|
public static final String UNKNOWN = "UNKNOWN";
|
||||||
public static final String NOT_AVAILABLE = "not available";
|
public static final String NOT_AVAILABLE = "not available";
|
||||||
|
|
||||||
|
public static final String ACTION_SET_SCHEME = "sysimport:actionset";
|
||||||
|
|
||||||
|
public static final String PROVENANCE_VOCABULARY = "dnet:provenanceActions";
|
||||||
|
|
||||||
|
public static final Qualifier ACTION_SET_PROVENANCE_QUALIFIER = qualifier(
|
||||||
|
ACTION_SET_SCHEME, ACTION_SET_SCHEME, PROVENANCE_VOCABULARY, PROVENANCE_VOCABULARY);
|
||||||
|
|
||||||
public static final Qualifier PUBLICATION_DEFAULT_RESULTTYPE = qualifier(
|
public static final Qualifier PUBLICATION_DEFAULT_RESULTTYPE = qualifier(
|
||||||
PUBLICATION_RESULTTYPE_CLASSID, PUBLICATION_RESULTTYPE_CLASSID,
|
PUBLICATION_RESULTTYPE_CLASSID, PUBLICATION_RESULTTYPE_CLASSID,
|
||||||
DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
|
DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);
|
||||||
|
|
|
@ -3,7 +3,9 @@ package eu.dnetlib.dhp.actionmanager.datacite
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{AccessRight, Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
|
@ -45,17 +47,11 @@ object DataciteToOAFTransformation {
|
||||||
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
||||||
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private val PID_VOCABULARY = "dnet:pid_types"
|
|
||||||
val COBJ_VOCABULARY = "dnet:publication_resource"
|
|
||||||
val RESULT_VOCABULARY = "dnet:result_typologies"
|
|
||||||
val ACCESS_MODE_VOCABULARY = "dnet:access_modes"
|
|
||||||
val DOI_CLASS = "doi"
|
val DOI_CLASS = "doi"
|
||||||
|
|
||||||
val TITLE_SCHEME = "dnet:dataCite_title"
|
|
||||||
val SUBJ_CLASS = "keywords"
|
val SUBJ_CLASS = "keywords"
|
||||||
val SUBJ_SCHEME = "dnet:subject_classification_typologies"
|
|
||||||
|
|
||||||
val j_filter:List[String] = {
|
val j_filter:List[String] = {
|
||||||
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
|
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
|
||||||
|
@ -66,7 +62,7 @@ object DataciteToOAFTransformation {
|
||||||
val unknown_repository: HostedByMapType = HostedByMapType("openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18", "Unknown Repository", "Unknown Repository", Some(1.0F))
|
val unknown_repository: HostedByMapType = HostedByMapType("openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18", "Unknown Repository", "Unknown Repository", Some(1.0F))
|
||||||
|
|
||||||
val dataInfo: DataInfo = generateDataInfo("0.9")
|
val dataInfo: DataInfo = generateDataInfo("0.9")
|
||||||
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("openaire____::9e3be59865b2c1c335d32dae2fe7b254", "Datacite")
|
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, "Datacite")
|
||||||
|
|
||||||
val hostedByMap: Map[String, HostedByMapType] = {
|
val hostedByMap: Map[String, HostedByMapType] = {
|
||||||
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
|
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
|
||||||
|
@ -174,20 +170,20 @@ object DataciteToOAFTransformation {
|
||||||
|
|
||||||
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies:VocabularyGroup): (Qualifier, Qualifier) = {
|
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies:VocabularyGroup): (Qualifier, Qualifier) = {
|
||||||
if (resourceType != null && resourceType.nonEmpty) {
|
if (resourceType != null && resourceType.nonEmpty) {
|
||||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, resourceType)
|
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
|
||||||
if (typeQualifier != null)
|
if (typeQualifier != null)
|
||||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
|
||||||
}
|
}
|
||||||
if (schemaOrg != null && schemaOrg.nonEmpty) {
|
if (schemaOrg != null && schemaOrg.nonEmpty) {
|
||||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, schemaOrg)
|
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, schemaOrg)
|
||||||
if (typeQualifier != null)
|
if (typeQualifier != null)
|
||||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
|
||||||
|
|
||||||
}
|
}
|
||||||
if (resourceTypeGeneral != null && resourceTypeGeneral.nonEmpty) {
|
if (resourceTypeGeneral != null && resourceTypeGeneral.nonEmpty) {
|
||||||
val typeQualifier = vocabularies.getSynonymAsQualifier(COBJ_VOCABULARY, resourceTypeGeneral)
|
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceTypeGeneral)
|
||||||
if (typeQualifier != null)
|
if (typeQualifier != null)
|
||||||
return (typeQualifier, vocabularies.getSynonymAsQualifier(RESULT_VOCABULARY, typeQualifier.getClassid))
|
return (typeQualifier, vocabularies.getSynonymAsQualifier(ModelConstants.DNET_RESULT_TYPOLOGIES, typeQualifier.getClassid))
|
||||||
|
|
||||||
}
|
}
|
||||||
null
|
null
|
||||||
|
@ -295,7 +291,7 @@ object DataciteToOAFTransformation {
|
||||||
return List()
|
return List()
|
||||||
|
|
||||||
|
|
||||||
val doi_q = vocabularies.getSynonymAsQualifier(PID_VOCABULARY, "doi")
|
val doi_q = OafMapperUtils.qualifier("doi", "doi", ModelConstants.DNET_PID_TYPES,ModelConstants.DNET_PID_TYPES)
|
||||||
val pid = OafMapperUtils.structuredProperty(doi, doi_q, dataInfo)
|
val pid = OafMapperUtils.structuredProperty(doi, doi_q, dataInfo)
|
||||||
result.setPid(List(pid).asJava)
|
result.setPid(List(pid).asJava)
|
||||||
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
|
||||||
|
@ -319,7 +315,7 @@ object DataciteToOAFTransformation {
|
||||||
a.setSurname(c.familyName.orNull)
|
a.setSurname(c.familyName.orNull)
|
||||||
if (c.nameIdentifiers!= null&& c.nameIdentifiers.isDefined && c.nameIdentifiers.get != null) {
|
if (c.nameIdentifiers!= null&& c.nameIdentifiers.isDefined && c.nameIdentifiers.get != null) {
|
||||||
a.setPid(c.nameIdentifiers.get.map(ni => {
|
a.setPid(c.nameIdentifiers.get.map(ni => {
|
||||||
val q = if (ni.nameIdentifierScheme.isDefined) vocabularies.getTermAsQualifier(PID_VOCABULARY, ni.nameIdentifierScheme.get.toLowerCase()) else null
|
val q = if (ni.nameIdentifierScheme.isDefined) vocabularies.getTermAsQualifier(ModelConstants.DNET_PID_TYPES, ni.nameIdentifierScheme.get.toLowerCase()) else null
|
||||||
if (ni.nameIdentifier!= null && ni.nameIdentifier.isDefined) {
|
if (ni.nameIdentifier!= null && ni.nameIdentifier.isDefined) {
|
||||||
OafMapperUtils.structuredProperty(ni.nameIdentifier.get, q, dataInfo)
|
OafMapperUtils.structuredProperty(ni.nameIdentifier.get, q, dataInfo)
|
||||||
}
|
}
|
||||||
|
@ -343,9 +339,9 @@ object DataciteToOAFTransformation {
|
||||||
|
|
||||||
result.setTitle(titles.filter(t => t.title.nonEmpty).map(t => {
|
result.setTitle(titles.filter(t => t.title.nonEmpty).map(t => {
|
||||||
if (t.titleType.isEmpty) {
|
if (t.titleType.isEmpty) {
|
||||||
OafMapperUtils.structuredProperty(t.title.get, "main title", "main title", TITLE_SCHEME, TITLE_SCHEME, null)
|
OafMapperUtils.structuredProperty(t.title.get, ModelConstants.MAIN_TITLE_QUALIFIER, null)
|
||||||
} else {
|
} else {
|
||||||
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, TITLE_SCHEME, TITLE_SCHEME, null)
|
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, ModelConstants.DNET_DATACITE_TITLE, ModelConstants.DNET_DATACITE_TITLE, null)
|
||||||
}
|
}
|
||||||
}).asJava)
|
}).asJava)
|
||||||
|
|
||||||
|
@ -390,7 +386,7 @@ object DataciteToOAFTransformation {
|
||||||
|
|
||||||
result.setSubject(subjects.filter(s => s.subject.nonEmpty)
|
result.setSubject(subjects.filter(s => s.subject.nonEmpty)
|
||||||
.map(s =>
|
.map(s =>
|
||||||
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, SUBJ_SCHEME, SUBJ_SCHEME, null)
|
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
|
||||||
).asJava)
|
).asJava)
|
||||||
|
|
||||||
|
|
||||||
|
@ -426,28 +422,33 @@ object DataciteToOAFTransformation {
|
||||||
JField("rightsUri", JString(rightsUri)) <- rightsList
|
JField("rightsUri", JString(rightsUri)) <- rightsList
|
||||||
} yield rightsUri
|
} yield rightsUri
|
||||||
|
|
||||||
val aRights: Option[Qualifier] = accessRights.map(r => {
|
val aRights: Option[AccessRight] = accessRights.map(r => {
|
||||||
vocabularies.getSynonymAsQualifier(ACCESS_MODE_VOCABULARY, r)
|
vocabularies.getSynonymAsQualifier(ModelConstants.DNET_ACCESS_MODES, r)
|
||||||
}).find(q => q != null)
|
}).find(q => q != null).map(q => {
|
||||||
|
val a = new AccessRight
|
||||||
|
a.setClassid(q.getClassid)
|
||||||
|
a.setClassname(q.getClassname)
|
||||||
|
a.setSchemeid(q.getSchemeid)
|
||||||
|
a.setSchemename(q.getSchemename)
|
||||||
|
a
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.qualifier("UNKNOWN", "not available", ACCESS_MODE_VOCABULARY, ACCESS_MODE_VOCABULARY)
|
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.accessRight("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
|
||||||
|
|
||||||
if (client.isDefined) {
|
if (client.isDefined) {
|
||||||
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
|
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
|
||||||
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
|
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
|
||||||
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
|
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
|
||||||
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
|
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
|
||||||
// instance.setAccessright(access_rights_qualifier)
|
instance.setAccessright(access_rights_qualifier)
|
||||||
|
instance.setPid(result.getPid)
|
||||||
//'http') and matches(., '.*(/licenses|/publicdomain|unlicense.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*')]">
|
|
||||||
val license = accessRights
|
val license = accessRights
|
||||||
.find(r => r.startsWith("http") && r.matches(".*(/licenses|/publicdomain|unlicense\\.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*"))
|
.find(r => r.startsWith("http") && r.matches(".*(/licenses|/publicdomain|unlicense\\.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*"))
|
||||||
if (license.isDefined)
|
if (license.isDefined)
|
||||||
instance.setLicense(OafMapperUtils.field(license.get, null))
|
instance.setLicense(OafMapperUtils.field(license.get, null))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
val awardUris:List[String] = for {
|
val awardUris:List[String] = for {
|
||||||
JObject(fundingReferences) <- json \\ "fundingReferences"
|
JObject(fundingReferences) <- json \\ "fundingReferences"
|
||||||
JField("awardUri", JString(awardUri)) <- fundingReferences
|
JField("awardUri", JString(awardUri)) <- fundingReferences
|
||||||
|
@ -455,6 +456,9 @@ object DataciteToOAFTransformation {
|
||||||
|
|
||||||
val relations:List[Relation] =awardUris.flatMap(a=> get_projectRelation(a, result.getId)).filter(r => r!= null)
|
val relations:List[Relation] =awardUris.flatMap(a=> get_projectRelation(a, result.getId)).filter(r => r!= null)
|
||||||
|
|
||||||
|
result.setId(IdentifierFactory.createIdentifier(result))
|
||||||
|
if(result.getId == null)
|
||||||
|
return List()
|
||||||
if (relations!= null && relations.nonEmpty) {
|
if (relations!= null && relations.nonEmpty) {
|
||||||
List(result):::relations
|
List(result):::relations
|
||||||
}
|
}
|
||||||
|
@ -468,7 +472,7 @@ object DataciteToOAFTransformation {
|
||||||
di.setInferred(false)
|
di.setInferred(false)
|
||||||
di.setInvisible(false)
|
di.setInvisible(false)
|
||||||
di.setTrust(trust)
|
di.setTrust(trust)
|
||||||
di.setProvenanceaction(OafMapperUtils.qualifier("sysimport:actionset", "sysimport:actionset", "dnet:provenanceActions", "dnet:provenanceActions"))
|
di.setProvenanceaction(ModelConstants.ACTION_SET_PROVENANCE_QUALIFIER)
|
||||||
di
|
di
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,6 @@ object GenerateDataciteDatasetSpark {
|
||||||
|
|
||||||
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
||||||
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
||||||
log.info(s"vocabulary size is ${vocabularies.getTerms("dnet:languages").size()}")
|
|
||||||
val spark: SparkSession = SparkSession.builder().config(conf)
|
val spark: SparkSession = SparkSession.builder().config(conf)
|
||||||
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
|
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
|
||||||
.master(master)
|
.master(master)
|
||||||
|
|
|
@ -148,7 +148,7 @@ object ImportDatacite {
|
||||||
try {
|
try {
|
||||||
var start: Long = System.currentTimeMillis
|
var start: Long = System.currentTimeMillis
|
||||||
while (from < now) {
|
while (from < now) {
|
||||||
client = new DataciteAPIImporter(from, 1000, from + delta)
|
client = new DataciteAPIImporter(from, 100, from + delta)
|
||||||
var end: Long = 0
|
var end: Long = 0
|
||||||
val key: IntWritable = new IntWritable(i)
|
val key: IntWritable = new IntWritable(i)
|
||||||
val value: Text = new Text
|
val value: Text = new Text
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,46 @@
|
||||||
|
<workflow-app name="Datacite_to_ActionSet_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sourcePath</name>
|
||||||
|
<description>the working path of Datacite stores</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the path of Datacite ActionSet</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="ExportDataset"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="ExportDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>ExportDataset</name>
|
||||||
|
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
|
||||||
|
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -12,12 +12,6 @@
|
||||||
"paramDescription": "the target mdstore path",
|
"paramDescription": "the target mdstore path",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"paramName": "tr",
|
|
||||||
"paramLongName": "transformationRule",
|
|
||||||
"paramDescription": "the transformation Rule",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"paramName": "m",
|
"paramName": "m",
|
||||||
"paramLongName": "master",
|
"paramLongName": "master",
|
||||||
|
|
|
@ -1,41 +1,21 @@
|
||||||
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
|
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
|
||||||
<parameters>
|
<parameters>
|
||||||
<property>
|
<property>
|
||||||
<name>mdstoreInputPath</name>
|
<name>mainPath</name>
|
||||||
<description>the path of the input MDStore</description>
|
<description>the working path of Datacite stores</description>
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>mdstoreOutputPath</name>
|
|
||||||
<description>the path of the cleaned mdstore</description>
|
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>nativeInputPath</name>
|
<name>isLookupUrl</name>
|
||||||
<description>the path of the input MDStore</description>
|
<description>The IS lookUp service endopoint</description>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>skipimport</name>
|
|
||||||
<value>false</value>
|
|
||||||
<description>the path of the input MDStore</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
|
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="resume_from"/>
|
<start to="ImportDatacite"/>
|
||||||
|
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<decision name="resume_from">
|
|
||||||
<switch>
|
|
||||||
<case to="TransformJob">${wf:conf('resumeFrom') eq 'TransformJob'}</case>
|
|
||||||
<case to="ExportDataset">${wf:conf('resumeFrom') eq 'ExportDataset'}</case>
|
|
||||||
<default to="ImportDatacite"/>
|
|
||||||
</switch>
|
|
||||||
</decision>
|
|
||||||
|
|
||||||
<action name="ImportDatacite">
|
<action name="ImportDatacite">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
@ -53,10 +33,9 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-t</arg><arg>${nativeInputPath}</arg>
|
<arg>--targetPath</arg><arg>${mainPath}/datacite_update</arg>
|
||||||
<arg>-d</arg><arg>${mdstoreInputPath}</arg>
|
<arg>--dataciteDumpPath</arg><arg>${mainPath}/datacite_dump</arg>
|
||||||
<arg>-n</arg><arg>${nameNode}</arg>
|
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||||
<arg>-s</arg><arg>${skipimport}</arg>
|
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="TransformJob"/>
|
<ok to="TransformJob"/>
|
||||||
|
@ -81,44 +60,9 @@
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>--sourcePath</arg><arg>${mdstoreInputPath}</arg>
|
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
|
||||||
<arg>--targetPath</arg><arg>${mdstoreOutputPath}</arg>
|
<arg>--targetPath</arg><arg>${mainPath}/datacite_oaf</arg>
|
||||||
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
|
||||||
<arg>-tr</arg><arg>${isLookupUrl}</arg>
|
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="End"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="DeletePathIfExists">
|
|
||||||
<fs>
|
|
||||||
<delete path='${mdstoreOutputPath}_raw_AS'/>
|
|
||||||
</fs>
|
|
||||||
<ok to="ExportDataset"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
|
|
||||||
<action name="ExportDataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn-cluster</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>ExportDataset</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--sourcePath</arg><arg>${mdstoreOutputPath}</arg>
|
|
||||||
<arg>--targetPath</arg><arg>${mdstoreOutputPath}_raw_AS</arg>
|
|
||||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
package eu.dnetlib.dhp.actionmanager.datacite
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension
|
||||||
|
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
@ExtendWith(Array(classOf[MockitoExtension]))
|
||||||
|
class DataciteToOAFTest extends AbstractVocabularyTest{
|
||||||
|
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
def setUp() :Unit = {
|
||||||
|
println("Called Method")
|
||||||
|
super.setUpVocabulary()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testMapping() :Unit = {
|
||||||
|
val record =Source.fromInputStream(getClass.getResourceAsStream("record.json")).mkString
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies )
|
||||||
|
println (mapper.writeValueAsString(res.head))
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
File diff suppressed because one or more lines are too long
|
@ -6,7 +6,6 @@ import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.{SaveMode, SparkSession}
|
import org.apache.spark.sql.{SaveMode, SparkSession}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
import org.apache.spark.sql.functions._
|
|
||||||
|
|
||||||
object SparkImportMagIntoDataset {
|
object SparkImportMagIntoDataset {
|
||||||
val datatypedict = Map(
|
val datatypedict = Map(
|
||||||
|
@ -25,11 +24,10 @@ object SparkImportMagIntoDataset {
|
||||||
"AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
|
"AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
|
||||||
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
||||||
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
|
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
|
||||||
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
||||||
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
|
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
|
||||||
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
|
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
|
||||||
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
|
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
|
||||||
// ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime']
|
|
||||||
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
|
||||||
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")),
|
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")),
|
||||||
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
|
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
|
||||||
|
@ -37,6 +35,7 @@ object SparkImportMagIntoDataset {
|
||||||
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
|
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
|
||||||
"PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
|
"PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
|
||||||
"PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
|
"PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
|
||||||
|
"PaperMeSH" -> Tuple2("advanced/PaperMeSH.txt", Seq("PaperId:long", "DescriptorUI:string", "DescriptorName:string", "QualifierUI:string", "QualifierName:string", "IsMajorTopic:bool")),
|
||||||
"PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
|
"PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
|
||||||
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
|
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
|
||||||
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
|
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),
|
||||||
|
|
Loading…
Reference in New Issue