WIP: using common definitions from ModelConstants

pull/104/head
Claudio Atzori 3 years ago
commit 879e8cc7ef

@ -100,6 +100,13 @@ public class ModelConstants {
public static final String UNKNOWN = "UNKNOWN";
public static final String NOT_AVAILABLE = "not available";
public static final String ACTION_SET_SCHEME = "sysimport:actionset";
public static final String PROVENANCE_VOCABULARY = "dnet:provenanceActions";
public static final Qualifier ACTION_SET_PROVENANCE_QUALIFIER = qualifier(
ACTION_SET_SCHEME, ACTION_SET_SCHEME, PROVENANCE_VOCABULARY, PROVENANCE_VOCABULARY);
public static final Qualifier PUBLICATION_DEFAULT_RESULTTYPE = qualifier(
PUBLICATION_RESULTTYPE_CLASSID, PUBLICATION_RESULTTYPE_CLASSID,
DNET_RESULT_TYPOLOGIES, DNET_RESULT_TYPOLOGIES);

@ -5,6 +5,9 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import eu.dnetlib.dhp.schema.oaf.{AccessRight, Author, DataInfo, Instance, KeyValue, Oaf, OafMapperUtils, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import org.json4s.DefaultFormats
@ -49,9 +52,9 @@ object DataciteToOAFTransformation {
val ACCESS_MODE_VOCABULARY = "dnet:access_modes"
val DOI_CLASS = "doi"
val TITLE_SCHEME = "dnet:dataCite_title"
val SUBJ_CLASS = "keywords"
val SUBJ_SCHEME = "dnet:subject_classification_typologies"
val j_filter:List[String] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
@ -62,7 +65,7 @@ object DataciteToOAFTransformation {
val unknown_repository: HostedByMapType = HostedByMapType("openaire____::1256f046-bf1f-4afc-8b47-d0b147148b18", "Unknown Repository", "Unknown Repository", Some(1.0F))
val dataInfo: DataInfo = generateDataInfo("0.9")
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue("openaire____::9e3be59865b2c1c335d32dae2fe7b254", "Datacite")
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, "Datacite")
val hostedByMap: Map[String, HostedByMapType] = {
val s = Source.fromInputStream(getClass.getResourceAsStream("hostedBy_map.json")).mkString
@ -291,7 +294,7 @@ object DataciteToOAFTransformation {
return List()
val doi_q = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PID_TYPES, "doi")
val doi_q = OafMapperUtils.qualifier("doi", "doi", ModelConstants.DNET_PID_TYPES,ModelConstants.DNET_PID_TYPES)
val pid = OafMapperUtils.structuredProperty(doi, doi_q, dataInfo)
result.setPid(List(pid).asJava)
result.setId(OafMapperUtils.createOpenaireId(50, s"datacite____::$doi", true))
@ -339,9 +342,9 @@ object DataciteToOAFTransformation {
result.setTitle(titles.filter(t => t.title.nonEmpty).map(t => {
if (t.titleType.isEmpty) {
OafMapperUtils.structuredProperty(t.title.get, "main title", "main title", TITLE_SCHEME, TITLE_SCHEME, null)
OafMapperUtils.structuredProperty(t.title.get, ModelConstants.MAIN_TITLE_QUALIFIER, null)
} else {
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, TITLE_SCHEME, TITLE_SCHEME, null)
OafMapperUtils.structuredProperty(t.title.get, t.titleType.get, t.titleType.get, ModelConstants.DNET_DATACITE_TITLE, ModelConstants.DNET_DATACITE_TITLE, null)
}
}).asJava)
@ -386,7 +389,7 @@ object DataciteToOAFTransformation {
result.setSubject(subjects.filter(s => s.subject.nonEmpty)
.map(s =>
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, SUBJ_SCHEME, SUBJ_SCHEME, null)
OafMapperUtils.structuredProperty(s.subject.get, SUBJ_CLASS, SUBJ_CLASS, ModelConstants.DNET_SUBJECT_TYPOLOGIES, ModelConstants.DNET_SUBJECT_TYPOLOGIES, null)
).asJava)
@ -422,28 +425,33 @@ object DataciteToOAFTransformation {
JField("rightsUri", JString(rightsUri)) <- rightsList
} yield rightsUri
val aRights: Option[Qualifier] = accessRights.map(r => {
val aRights: Option[AccessRight] = accessRights.map(r => {
vocabularies.getSynonymAsQualifier(ModelConstants.DNET_ACCESS_MODES, r)
}).find(q => q != null)
}).find(q => q != null).map(q => {
val a = new AccessRight
a.setClassid(q.getClassid)
a.setClassname(q.getClassname)
a.setSchemeid(q.getSchemeid)
a.setSchemename(q.getSchemename)
a
})
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.qualifier(ModelConstants.UNKNOWN, ModelConstants.NOT_AVAILABLE, ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
val access_rights_qualifier = if (aRights.isDefined) aRights.get else OafMapperUtils.accessRight("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)
if (client.isDefined) {
val hb = hostedByMap.getOrElse(client.get.toUpperCase(), unknown_repository)
instance.setHostedby(OafMapperUtils.keyValue(generateDSId(hb.openaire_id), hb.official_name))
instance.setCollectedfrom(DATACITE_COLLECTED_FROM)
instance.setUrl(List(s"https://dx.doi.org/$doi").asJava)
// instance.setAccessright(access_rights_qualifier)
//'http') and matches(., '.*(/licenses|/publicdomain|unlicense.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*')]">
instance.setAccessright(access_rights_qualifier)
instance.setPid(result.getPid)
val license = accessRights
.find(r => r.startsWith("http") && r.matches(".*(/licenses|/publicdomain|unlicense\\.org/|/legal-and-data-protection-notices|/download/license|/open-government-licence).*"))
if (license.isDefined)
instance.setLicense(OafMapperUtils.field(license.get, null))
}
val awardUris:List[String] = for {
JObject(fundingReferences) <- json \\ "fundingReferences"
JField("awardUri", JString(awardUri)) <- fundingReferences
@ -451,6 +459,9 @@ object DataciteToOAFTransformation {
val relations:List[Relation] =awardUris.flatMap(a=> get_projectRelation(a, result.getId)).filter(r => r!= null)
result.setId(IdentifierFactory.createIdentifier(result))
if(result.getId == null)
return List()
if (relations!= null && relations.nonEmpty) {
List(result):::relations
}
@ -464,7 +475,7 @@ object DataciteToOAFTransformation {
di.setInferred(false)
di.setInvisible(false)
di.setTrust(trust)
di.setProvenanceaction(OafMapperUtils.qualifier("sysimport:actionset", "sysimport:actionset", "dnet:provenanceActions", "dnet:provenanceActions"))
di.setProvenanceaction(ModelConstants.ACTION_SET_PROVENANCE_QUALIFIER)
di
}

@ -27,7 +27,6 @@ object GenerateDataciteDatasetSpark {
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
log.info(s"vocabulary size is ${vocabularies.getTerms("dnet:languages").size()}")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
.master(master)

@ -148,7 +148,7 @@ object ImportDatacite {
try {
var start: Long = System.currentTimeMillis
while (from < now) {
client = new DataciteAPIImporter(from, 1000, from + delta)
client = new DataciteAPIImporter(from, 100, from + delta)
var end: Long = 0
val key: IntWritable = new IntWritable(i)
val value: Text = new Text

@ -0,0 +1,23 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>

@ -0,0 +1,46 @@
<workflow-app name="Datacite_to_ActionSet_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>outputPath</name>
<description>the path of Datacite ActionSet</description>
</property>
</parameters>
<start to="ExportDataset"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ExportDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExportDataset</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${outputPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -12,12 +12,6 @@
"paramDescription": "the target mdstore path",
"paramRequired": true
},
{
"paramName": "tr",
"paramLongName": "transformationRule",
"paramDescription": "the transformation Rule",
"paramRequired": true
},
{
"paramName": "m",
"paramLongName": "master",

@ -1,41 +1,21 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdstoreInputPath</name>
<description>the path of the input MDStore</description>
</property>
<property>
<name>mdstoreOutputPath</name>
<description>the path of the cleaned mdstore</description>
</property>
<property>
<name>nativeInputPath</name>
<description>the path of the input MDStore</description>
<name>mainPath</name>
<description>the working path of Datacite stores</description>
</property>
<property>
<name>skipimport</name>
<value>false</value>
<description>the path of the input MDStore</description>
<name>isLookupUrl</name>
<description>The IS lookUp service endopoint</description>
</property>
</parameters>
<start to="resume_from"/>
<start to="ImportDatacite"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<decision name="resume_from">
<switch>
<case to="TransformJob">${wf:conf('resumeFrom') eq 'TransformJob'}</case>
<case to="ExportDataset">${wf:conf('resumeFrom') eq 'ExportDataset'}</case>
<default to="ImportDatacite"/>
</switch>
</decision>
<action name="ImportDatacite">
<spark xmlns="uri:oozie:spark-action:0.2">
@ -53,10 +33,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>-t</arg><arg>${nativeInputPath}</arg>
<arg>-d</arg><arg>${mdstoreInputPath}</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-s</arg><arg>${skipimport}</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_update</arg>
<arg>--dataciteDumpPath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="TransformJob"/>
@ -81,44 +60,9 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mdstoreInputPath}</arg>
<arg>--targetPath</arg><arg>${mdstoreOutputPath}</arg>
<arg>--sourcePath</arg><arg>${mainPath}/datacite_dump</arg>
<arg>--targetPath</arg><arg>${mainPath}/datacite_oaf</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>-tr</arg><arg>${isLookupUrl}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<action name="DeletePathIfExists">
<fs>
<delete path='${mdstoreOutputPath}_raw_AS'/>
</fs>
<ok to="ExportDataset"/>
<error to="Kill"/>
</action>
<action name="ExportDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExportDataset</name>
<class>eu.dnetlib.dhp.actionmanager.datacite.ExportActionSetJobNode</class>
<jar>dhp-aggregation-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${mdstoreOutputPath}</arg>
<arg>--targetPath</arg><arg>${mdstoreOutputPath}_raw_AS</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>

@ -1,4 +1,4 @@
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<workflow-app name="Transformation_Workflow" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>mdStoreInputId</name>

@ -0,0 +1,35 @@
package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import scala.io.Source
@ExtendWith(Array(classOf[MockitoExtension]))
class DataciteToOAFTest extends AbstractVocabularyTest{
@BeforeEach
def setUp() :Unit = {
println("Called Method")
super.setUpVocabulary()
}
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("record.json")).mkString
val mapper = new ObjectMapper()
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies )
println (mapper.writeValueAsString(res.head))
}
}

@ -6,7 +6,6 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
import org.apache.spark.sql.functions._
object SparkImportMagIntoDataset {
val datatypedict = Map(
@ -25,11 +24,10 @@ object SparkImportMagIntoDataset {
"AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
// ['FieldOfStudyId:long', 'Rank:uint', 'NormalizedName:string', 'DisplayName:string', 'MainType:string', 'Level:int', 'PaperCount:long', 'PaperFamilyCount:long', 'CitationCount:long', 'CreatedDate:DateTime']
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")),
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
@ -37,6 +35,7 @@ object SparkImportMagIntoDataset {
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
"PaperExtendedAttributes" -> Tuple2("mag/PaperExtendedAttributes.txt", Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")),
"PaperFieldsOfStudy" -> Tuple2("advanced/PaperFieldsOfStudy.txt", Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")),
"PaperMeSH" -> Tuple2("advanced/PaperMeSH.txt", Seq("PaperId:long", "DescriptorUI:string", "DescriptorName:string", "QualifierUI:string", "QualifierName:string", "IsMajorTopic:bool")),
"PaperRecommendations" -> Tuple2("advanced/PaperRecommendations.txt", Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")),
"PaperReferences" -> Tuple2("mag/PaperReferences.txt", Seq("PaperId:long", "PaperReferenceId:long")),
"PaperResources" -> Tuple2("mag/PaperResources.txt", Seq("PaperId:long", "ResourceType:int", "ResourceUrl:string", "SourceUrl:string", "RelationshipType:int")),

Loading…
Cancel
Save