forked from D-Net/dnet-hadoop
Datacite: Code Refactor generated a general SparkApplication Scala where all the spark scala have to inherit
Commented a little the Datacite transformation code
This commit is contained in:
parent
a7cf277d98
commit
2164a2a889
|
@ -0,0 +1,37 @@
|
||||||
|
package eu.dnetlib.dhp.application
|
||||||
|
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
|
import org.slf4j.Logger
|
||||||
|
|
||||||
|
abstract class AbstractScalaApplication (val propertyPath:String, val args:Array[String], log:Logger) extends SparkScalaApplication {
|
||||||
|
|
||||||
|
var parser: ArgumentApplicationParser = null
|
||||||
|
|
||||||
|
var spark:SparkSession = null
|
||||||
|
|
||||||
|
|
||||||
|
def initialize():SparkScalaApplication = {
|
||||||
|
parser = parseArguments(args)
|
||||||
|
spark = createSparkSession()
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility for creating a spark session starting from parser
|
||||||
|
*
|
||||||
|
* @return a spark Session
|
||||||
|
*/
|
||||||
|
private def createSparkSession():SparkSession = {
|
||||||
|
require(parser!= null)
|
||||||
|
|
||||||
|
val conf:SparkConf = new SparkConf()
|
||||||
|
val master = parser.get("master")
|
||||||
|
log.info(s"Creating Spark session: Master: $master")
|
||||||
|
SparkSession.builder().config(conf)
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.master(master)
|
||||||
|
.getOrCreate()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,35 @@
|
||||||
|
package eu.dnetlib.dhp.application
|
||||||
|
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is the main Interface SparkApplication
|
||||||
|
* where all the Spark Scala class should inherit
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
trait SparkScalaApplication {
|
||||||
|
/**
|
||||||
|
* This is the path in the classpath of the json
|
||||||
|
* describes all the argument needed to run
|
||||||
|
*/
|
||||||
|
val propertyPath: String
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility to parse the arguments using the
|
||||||
|
* property json in the classpath identified from
|
||||||
|
* the variable propertyPath
|
||||||
|
*
|
||||||
|
* @param args the list of arguments
|
||||||
|
*/
|
||||||
|
def parseArguments(args: Array[String]): ArgumentApplicationParser = {
|
||||||
|
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream(propertyPath)).mkString)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
parser
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Here all the spark applications runs this method
|
||||||
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
|
def run(): Unit
|
||||||
|
}
|
|
@ -0,0 +1,134 @@
|
||||||
|
package eu.dnetlib.dhp.datacite
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
|
||||||
|
|
||||||
|
import java.io.InputStream
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
|
import java.util.Locale
|
||||||
|
import java.util.regex.Pattern
|
||||||
|
import scala.io.Source
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represent the dataModel of the input Dataset of Datacite
|
||||||
|
* @param doi THE DOI
|
||||||
|
* @param timestamp timestamp of last update date
|
||||||
|
* @param isActive the record is active or deleted
|
||||||
|
* @param json the json native records
|
||||||
|
*/
|
||||||
|
case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {}
|
||||||
|
|
||||||
|
/*
|
||||||
|
The following class are utility class used for the mapping from
|
||||||
|
json datacite to OAF Shema
|
||||||
|
*/
|
||||||
|
case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {}
|
||||||
|
|
||||||
|
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
|
||||||
|
|
||||||
|
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
|
||||||
|
|
||||||
|
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
|
||||||
|
|
||||||
|
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
|
||||||
|
|
||||||
|
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
|
||||||
|
|
||||||
|
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
|
||||||
|
|
||||||
|
case class DateType(date: Option[String], dateType: Option[String]) {}
|
||||||
|
|
||||||
|
case class OAFRelations(relation:String, inverse:String, relType:String)
|
||||||
|
|
||||||
|
|
||||||
|
class DataciteModelConstants extends Serializable {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
object DataciteModelConstants {
|
||||||
|
|
||||||
|
val REL_TYPE_VALUE:String = "resultResult"
|
||||||
|
val DATE_RELATION_KEY = "RelationDate"
|
||||||
|
val DATACITE_FILTER_PATH = "/eu/dnetlib/dhp/datacite/datacite_filter"
|
||||||
|
val DOI_CLASS = "doi"
|
||||||
|
val SUBJ_CLASS = "keywords"
|
||||||
|
val DATACITE_NAME = "Datacite"
|
||||||
|
val dataInfo: DataInfo = dataciteDataInfo("0.9")
|
||||||
|
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, DATACITE_NAME)
|
||||||
|
|
||||||
|
val subRelTypeMapping: Map[String,OAFRelations] = Map(
|
||||||
|
ModelConstants.REFERENCES -> OAFRelations(ModelConstants.REFERENCES, ModelConstants.IS_REFERENCED_BY, ModelConstants.RELATIONSHIP),
|
||||||
|
ModelConstants.IS_REFERENCED_BY -> OAFRelations(ModelConstants.IS_REFERENCED_BY,ModelConstants.REFERENCES, ModelConstants.RELATIONSHIP),
|
||||||
|
|
||||||
|
ModelConstants.IS_SUPPLEMENTED_BY -> OAFRelations(ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.SUPPLEMENT),
|
||||||
|
ModelConstants.IS_SUPPLEMENT_TO -> OAFRelations(ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.SUPPLEMENT),
|
||||||
|
|
||||||
|
ModelConstants.HAS_PART -> OAFRelations(ModelConstants.HAS_PART,ModelConstants.IS_PART_OF, ModelConstants.PART),
|
||||||
|
ModelConstants.IS_PART_OF -> OAFRelations(ModelConstants.IS_PART_OF,ModelConstants.HAS_PART, ModelConstants.PART),
|
||||||
|
|
||||||
|
ModelConstants.IS_VERSION_OF-> OAFRelations(ModelConstants.IS_VERSION_OF,ModelConstants.HAS_VERSION,ModelConstants.VERSION),
|
||||||
|
ModelConstants.HAS_VERSION-> OAFRelations(ModelConstants.HAS_VERSION,ModelConstants.IS_VERSION_OF,ModelConstants.VERSION),
|
||||||
|
|
||||||
|
ModelConstants.IS_IDENTICAL_TO -> OAFRelations(ModelConstants.IS_IDENTICAL_TO,ModelConstants.IS_IDENTICAL_TO, ModelConstants.RELATIONSHIP),
|
||||||
|
|
||||||
|
ModelConstants.IS_CONTINUED_BY -> OAFRelations(ModelConstants.IS_CONTINUED_BY,ModelConstants.CONTINUES, ModelConstants.RELATIONSHIP),
|
||||||
|
ModelConstants.CONTINUES -> OAFRelations(ModelConstants.CONTINUES,ModelConstants.IS_CONTINUED_BY, ModelConstants.RELATIONSHIP),
|
||||||
|
|
||||||
|
ModelConstants.IS_NEW_VERSION_OF-> OAFRelations(ModelConstants.IS_NEW_VERSION_OF,ModelConstants.IS_PREVIOUS_VERSION_OF, ModelConstants.VERSION),
|
||||||
|
ModelConstants.IS_PREVIOUS_VERSION_OF ->OAFRelations(ModelConstants.IS_PREVIOUS_VERSION_OF,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION),
|
||||||
|
|
||||||
|
ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP),
|
||||||
|
ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP),
|
||||||
|
|
||||||
|
ModelConstants.IS_SOURCE_OF -> OAFRelations(ModelConstants.IS_SOURCE_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION),
|
||||||
|
ModelConstants.IS_DERIVED_FROM -> OAFRelations(ModelConstants.IS_DERIVED_FROM,ModelConstants.IS_SOURCE_OF, ModelConstants.VERSION),
|
||||||
|
|
||||||
|
ModelConstants.CITES -> OAFRelations(ModelConstants.CITES,ModelConstants.IS_CITED_BY, ModelConstants.CITATION),
|
||||||
|
ModelConstants.IS_CITED_BY -> OAFRelations(ModelConstants.IS_CITED_BY,ModelConstants.CITES, ModelConstants.CITATION),
|
||||||
|
|
||||||
|
ModelConstants.IS_VARIANT_FORM_OF -> OAFRelations(ModelConstants.IS_VARIANT_FORM_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION),
|
||||||
|
ModelConstants.IS_OBSOLETED_BY -> OAFRelations(ModelConstants.IS_OBSOLETED_BY,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION),
|
||||||
|
|
||||||
|
ModelConstants.REVIEWS -> OAFRelations(ModelConstants.REVIEWS,ModelConstants.IS_REVIEWED_BY, ModelConstants.REVIEW),
|
||||||
|
ModelConstants.IS_REVIEWED_BY -> OAFRelations(ModelConstants.IS_REVIEWED_BY,ModelConstants.REVIEWS, ModelConstants.REVIEW),
|
||||||
|
|
||||||
|
ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP),
|
||||||
|
ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP),
|
||||||
|
|
||||||
|
ModelConstants.COMPILES -> OAFRelations(ModelConstants.COMPILES,ModelConstants.IS_COMPILED_BY, ModelConstants.RELATIONSHIP),
|
||||||
|
ModelConstants.IS_COMPILED_BY -> OAFRelations(ModelConstants.IS_COMPILED_BY,ModelConstants.COMPILES, ModelConstants.RELATIONSHIP)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
val datacite_filter: List[String] = {
|
||||||
|
val stream: InputStream = getClass.getResourceAsStream(DATACITE_FILTER_PATH)
|
||||||
|
require(stream!= null)
|
||||||
|
Source.fromInputStream(stream).getLines().toList
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def dataciteDataInfo(trust: String): DataInfo = OafMapperUtils.dataInfo(false,null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, trust)
|
||||||
|
|
||||||
|
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
|
||||||
|
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
|
||||||
|
|
||||||
|
val funder_regex: List[(Pattern, String)] = List(
|
||||||
|
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"),
|
||||||
|
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::")
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
val Date_regex: List[Pattern] = List(
|
||||||
|
//Y-M-D
|
||||||
|
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
|
||||||
|
//M-D-Y
|
||||||
|
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
|
||||||
|
//D-M-Y
|
||||||
|
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
|
||||||
|
//Y
|
||||||
|
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.dhp.datacite
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
|
import eu.dnetlib.dhp.datacite.DataciteModelConstants._
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
|
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
|
||||||
|
@ -12,115 +13,30 @@ import org.json4s.DefaultFormats
|
||||||
import org.json4s.JsonAST.{JField, JObject, JString}
|
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
|
|
||||||
import java.nio.charset.CodingErrorAction
|
|
||||||
import java.text.SimpleDateFormat
|
import java.text.SimpleDateFormat
|
||||||
import java.time.LocalDate
|
import java.time.LocalDate
|
||||||
import java.time.chrono.ThaiBuddhistDate
|
import java.time.chrono.ThaiBuddhistDate
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.regex.Pattern
|
|
||||||
import java.util.{Date, Locale}
|
import java.util.{Date, Locale}
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.io.{Codec, Source}
|
|
||||||
import scala.language.postfixOps
|
|
||||||
|
|
||||||
case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {}
|
|
||||||
|
|
||||||
case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {}
|
|
||||||
|
|
||||||
case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {}
|
|
||||||
|
|
||||||
case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {}
|
|
||||||
|
|
||||||
case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {}
|
|
||||||
|
|
||||||
case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {}
|
|
||||||
|
|
||||||
case class DescriptionType(descriptionType: Option[String], description: Option[String]) {}
|
|
||||||
|
|
||||||
case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {}
|
|
||||||
|
|
||||||
case class DateType(date: Option[String], dateType: Option[String]) {}
|
|
||||||
|
|
||||||
//case class HostedByMapType(openaire_id: String, datacite_name: String, official_name: String, similarity: Option[Float]) {}
|
|
||||||
|
|
||||||
object DataciteToOAFTransformation {
|
object DataciteToOAFTransformation {
|
||||||
|
|
||||||
val REL_TYPE_VALUE:String = "resultResult"
|
|
||||||
val DATE_RELATION_KEY = "RelationDate"
|
|
||||||
|
|
||||||
val subRelTypeMapping: Map[String,(String,String)] = Map(
|
|
||||||
"References" ->("IsReferencedBy","relationship"),
|
|
||||||
"IsSupplementTo" ->("IsSupplementedBy","supplement"),
|
|
||||||
"IsPartOf" ->("HasPart","part"),
|
|
||||||
"HasPart" ->("IsPartOf","part"),
|
|
||||||
"IsVersionOf" ->("HasVersion","version"),
|
|
||||||
"HasVersion" ->("IsVersionOf","version"),
|
|
||||||
"IsIdenticalTo" ->("IsIdenticalTo","relationship"),
|
|
||||||
"IsPreviousVersionOf" ->("IsNewVersionOf","version"),
|
|
||||||
"IsContinuedBy" ->("Continues","relationship"),
|
|
||||||
"Continues" ->("IsContinuedBy","relationship"),
|
|
||||||
"IsNewVersionOf" ->("IsPreviousVersionOf","version"),
|
|
||||||
"IsSupplementedBy" ->("IsSupplementTo","supplement"),
|
|
||||||
"IsDocumentedBy" ->("Documents","relationship"),
|
|
||||||
"IsSourceOf" ->("IsDerivedFrom","relationship"),
|
|
||||||
"Cites" ->("IsCitedBy","citation"),
|
|
||||||
"IsCitedBy" ->("Cites","citation"),
|
|
||||||
"IsDerivedFrom" ->("IsSourceOf","relationship"),
|
|
||||||
"IsVariantFormOf" ->("IsDerivedFrom","version"),
|
|
||||||
"IsReferencedBy" ->("References","relationship"),
|
|
||||||
"IsObsoletedBy" ->("IsNewVersionOf","version"),
|
|
||||||
"Reviews" ->("IsReviewedBy","review"),
|
|
||||||
"Documents" ->("IsDocumentedBy","relationship"),
|
|
||||||
"IsCompiledBy" ->("Compiles","relationship"),
|
|
||||||
"Compiles" ->("IsCompiledBy","relationship"),
|
|
||||||
"IsReviewedBy" ->("Reviews","review")
|
|
||||||
)
|
|
||||||
|
|
||||||
implicit val codec: Codec = Codec("UTF-8")
|
|
||||||
codec.onMalformedInput(CodingErrorAction.REPLACE)
|
|
||||||
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
|
|
||||||
|
|
||||||
val DOI_CLASS = "doi"
|
|
||||||
val SUBJ_CLASS = "keywords"
|
|
||||||
|
|
||||||
|
|
||||||
val j_filter: List[String] = {
|
|
||||||
val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString
|
|
||||||
s.lines.toList
|
|
||||||
}
|
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
val dataInfo: DataInfo = generateDataInfo("0.9")
|
|
||||||
val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, "Datacite")
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method should skip record if json contains invalid text
|
||||||
val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH)
|
* defined in gile datacite_filter
|
||||||
val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN)
|
* @param json
|
||||||
|
* @return True if the record should be skipped
|
||||||
val funder_regex: List[(Pattern, String)] = List(
|
*/
|
||||||
(Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"),
|
def skip_record(json: String): Boolean = {
|
||||||
(Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::")
|
datacite_filter.exists(f => json.contains(f))
|
||||||
|
|
||||||
)
|
|
||||||
|
|
||||||
val Date_regex: List[Pattern] = List(
|
|
||||||
//Y-M-D
|
|
||||||
Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE),
|
|
||||||
//M-D-Y
|
|
||||||
Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE),
|
|
||||||
//D-M-Y
|
|
||||||
Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE),
|
|
||||||
//Y
|
|
||||||
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def filter_json(json: String): Boolean = {
|
|
||||||
j_filter.exists(f => json.contains(f))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("this method will be removed", "dhp")
|
||||||
def toActionSet(item: Oaf): (String, String) = {
|
def toActionSet(item: Oaf): (String, String) = {
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
@ -200,6 +116,8 @@ object DataciteToOAFTransformation {
|
||||||
case _: Throwable => ""
|
case _: Throwable => ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = {
|
def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = {
|
||||||
if (resourceType != null && resourceType.nonEmpty) {
|
if (resourceType != null && resourceType.nonEmpty) {
|
||||||
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
|
val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType)
|
||||||
|
@ -318,11 +236,7 @@ object DataciteToOAFTransformation {
|
||||||
val p = match_pattern.get._2
|
val p = match_pattern.get._2
|
||||||
val grantId = m.matcher(awardUri).replaceAll("$2")
|
val grantId = m.matcher(awardUri).replaceAll("$2")
|
||||||
val targetId = s"$p${DHPUtils.md5(grantId)}"
|
val targetId = s"$p${DHPUtils.md5(grantId)}"
|
||||||
List(
|
List( generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) )
|
||||||
generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo)
|
|
||||||
// REMOVED INVERSE RELATION since there is a specific method that should generate later
|
|
||||||
// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
List()
|
List()
|
||||||
|
@ -331,7 +245,7 @@ object DataciteToOAFTransformation {
|
||||||
|
|
||||||
|
|
||||||
def generateOAF(input: String, ts: Long, dateOfCollection: Long, vocabularies: VocabularyGroup, exportLinks: Boolean): List[Oaf] = {
|
def generateOAF(input: String, ts: Long, dateOfCollection: Long, vocabularies: VocabularyGroup, exportLinks: Boolean): List[Oaf] = {
|
||||||
if (filter_json(input))
|
if (skip_record(input))
|
||||||
return List()
|
return List()
|
||||||
|
|
||||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||||
|
@ -565,7 +479,7 @@ object DataciteToOAFTransformation {
|
||||||
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
|
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
|
||||||
rel.setDataInfo(dataInfo)
|
rel.setDataInfo(dataInfo)
|
||||||
|
|
||||||
val subRelType = subRelTypeMapping(r.relationType)._2
|
val subRelType = subRelTypeMapping(r.relationType).relType
|
||||||
rel.setRelType(REL_TYPE_VALUE)
|
rel.setRelType(REL_TYPE_VALUE)
|
||||||
rel.setSubRelType(subRelType)
|
rel.setSubRelType(subRelType)
|
||||||
rel.setRelClass(r.relationType)
|
rel.setRelClass(r.relationType)
|
||||||
|
@ -579,18 +493,9 @@ object DataciteToOAFTransformation {
|
||||||
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
|
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
|
||||||
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
|
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
|
||||||
rel
|
rel
|
||||||
}).toList
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
def generateDataInfo(trust: String): DataInfo = {
|
|
||||||
val di = new DataInfo
|
|
||||||
di.setDeletedbyinference(false)
|
|
||||||
di.setInferred(false)
|
|
||||||
di.setInvisible(false)
|
|
||||||
di.setTrust(trust)
|
|
||||||
di.setProvenanceaction(ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER)
|
|
||||||
di
|
|
||||||
}
|
|
||||||
|
|
||||||
def generateDSId(input: String): String = {
|
def generateDSId(input: String): String = {
|
||||||
val b = StringUtils.substringBefore(input, "::")
|
val b = StringUtils.substringBefore(input, "::")
|
||||||
|
|
|
@ -1,64 +1,94 @@
|
||||||
package eu.dnetlib.dhp.datacite
|
package eu.dnetlib.dhp.datacite
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
|
import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations
|
||||||
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
|
import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH}
|
||||||
import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH
|
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
|
||||||
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
|
import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord}
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
|
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory
|
||||||
import org.apache.spark.SparkConf
|
|
||||||
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.io.Source
|
|
||||||
|
|
||||||
object GenerateDataciteDatasetSpark {
|
class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) {
|
||||||
|
/**
|
||||||
|
* Here all the spark applications runs this method
|
||||||
|
* where the whole logic of the spark node is defined
|
||||||
|
*/
|
||||||
|
override def run(): Unit = {
|
||||||
|
|
||||||
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val conf = new SparkConf
|
|
||||||
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString)
|
|
||||||
parser.parseArgument(args)
|
|
||||||
val master = parser.get("master")
|
|
||||||
val sourcePath = parser.get("sourcePath")
|
val sourcePath = parser.get("sourcePath")
|
||||||
|
log.info(s"SourcePath is '$sourcePath'")
|
||||||
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
|
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
|
||||||
|
log.info(s"exportLinks is '$exportLinks'")
|
||||||
val isLookupUrl: String = parser.get("isLookupUrl")
|
val isLookupUrl: String = parser.get("isLookupUrl")
|
||||||
log.info("isLookupUrl: {}", isLookupUrl)
|
log.info("isLookupUrl: {}", isLookupUrl)
|
||||||
|
|
||||||
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
|
||||||
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
|
||||||
val spark: SparkSession = SparkSession.builder().config(conf)
|
require(vocabularies != null)
|
||||||
.appName(GenerateDataciteDatasetSpark.getClass.getSimpleName)
|
|
||||||
.master(master)
|
|
||||||
.getOrCreate()
|
|
||||||
|
|
||||||
|
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
|
||||||
|
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
|
||||||
|
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
|
||||||
|
log.info(s"outputBasePath is '$outputBasePath'")
|
||||||
|
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
|
||||||
|
log.info(s"targetPath is '$targetPath'")
|
||||||
|
|
||||||
|
generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark)
|
||||||
|
|
||||||
|
reportTotalSize(targetPath, outputBasePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For working with MDStore we need to store in a file on hdfs the size of
|
||||||
|
* the current dataset
|
||||||
|
* @param targetPath
|
||||||
|
* @param outputBasePath
|
||||||
|
*/
|
||||||
|
def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = {
|
||||||
|
val total_items = spark.read.load(targetPath).count()
|
||||||
|
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate the transformed and cleaned OAF Dataset from the native one
|
||||||
|
|
||||||
|
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite
|
||||||
|
* @param exportLinks If true it generates unresolved links
|
||||||
|
* @param vocabularies vocabularies for cleaning
|
||||||
|
* @param targetPath the targetPath of the result Dataset
|
||||||
|
*/
|
||||||
|
def generateDataciteDataset(sourcePath: String, exportLinks: Boolean, vocabularies: VocabularyGroup, targetPath: String, spark:SparkSession):Unit = {
|
||||||
|
require(spark!= null)
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
|
implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord]
|
||||||
|
|
||||||
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
|
||||||
val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
|
|
||||||
val mapper = new ObjectMapper()
|
|
||||||
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
|
|
||||||
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
|
|
||||||
|
|
||||||
log.info("outputBasePath: {}", outputBasePath)
|
|
||||||
val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH"
|
|
||||||
|
|
||||||
spark.read.load(sourcePath).as[DataciteType]
|
spark.read.load(sourcePath).as[DataciteType]
|
||||||
.filter(d => d.isActive)
|
.filter(d => d.isActive)
|
||||||
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
|
.flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks))
|
||||||
.filter(d => d != null)
|
.filter(d => d != null)
|
||||||
.flatMap(i => fixRelations(i)).filter(i => i != null)
|
.flatMap(i => fixRelations(i)).filter(i => i != null)
|
||||||
.write.mode(SaveMode.Overwrite).save(targetPath)
|
.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
val total_items = spark.read.load(targetPath).as[Oaf].count()
|
}
|
||||||
writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH)
|
|
||||||
|
|
||||||
|
object GenerateDataciteDatasetSpark {
|
||||||
|
|
||||||
|
val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass)
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
new GenerateDataciteDatasetSpark("/eu/dnetlib/dhp/datacite/generate_dataset_params.json", args, log).initialize().run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,3 +7,6 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
|
||||||
# A1 uses PatternLayout.
|
# A1 uses PatternLayout.
|
||||||
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
|
||||||
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
|
||||||
|
|
||||||
|
log4j.logger.org.apache.spark=FATAL
|
||||||
|
log4j.logger.org.spark_project=FATAL
|
||||||
|
|
|
@ -4,21 +4,29 @@ package eu.dnetlib.dhp.datacite
|
||||||
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
|
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
|
||||||
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
|
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf
|
import eu.dnetlib.dhp.schema.oaf.Oaf
|
||||||
|
import org.apache.spark.SparkConf
|
||||||
|
import org.apache.spark.sql.functions.{col, count}
|
||||||
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
import org.junit.jupiter.api.{BeforeEach, Test}
|
import org.junit.jupiter.api.{BeforeEach, Test}
|
||||||
import org.mockito.junit.jupiter.MockitoExtension
|
import org.mockito.junit.jupiter.MockitoExtension
|
||||||
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
|
import java.nio.file.{Files, Path}
|
||||||
import java.text.SimpleDateFormat
|
import java.text.SimpleDateFormat
|
||||||
import java.util.Locale
|
import java.util.Locale
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
import org.junit.jupiter.api.Assertions._
|
||||||
@ExtendWith(Array(classOf[MockitoExtension]))
|
@ExtendWith(Array(classOf[MockitoExtension]))
|
||||||
class DataciteToOAFTest extends AbstractVocabularyTest{
|
class DataciteToOAFTest extends AbstractVocabularyTest{
|
||||||
|
|
||||||
|
var workingDir:Path= null
|
||||||
|
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
def setUp() :Unit = {
|
def setUp() :Unit = {
|
||||||
|
|
||||||
|
workingDir= Files.createTempDirectory(getClass.getSimpleName)
|
||||||
super.setUpVocabulary()
|
super.setUpVocabulary()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +39,51 @@ class DataciteToOAFTest extends AbstractVocabularyTest{
|
||||||
println(dt.getTime)
|
println(dt.getTime)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
def testConvert(): Unit = {
|
||||||
|
|
||||||
|
|
||||||
|
val path = getClass.getResource("/eu/dnetlib/dhp/actionmanager/datacite/dataset").getPath
|
||||||
|
|
||||||
|
val conf = new SparkConf()
|
||||||
|
val spark:SparkSession = SparkSession.builder().config(conf)
|
||||||
|
.appName(getClass.getSimpleName)
|
||||||
|
.master("local[*]")
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
val instance = new GenerateDataciteDatasetSpark(null, null, log)
|
||||||
|
val targetPath = s"$workingDir/result"
|
||||||
|
|
||||||
|
instance.generateDataciteDataset(path, exportLinks = true, vocabularies,targetPath, spark)
|
||||||
|
|
||||||
|
import spark.implicits._
|
||||||
|
|
||||||
|
val nativeSize =spark.read.load(path).count()
|
||||||
|
|
||||||
|
|
||||||
|
assertEquals(100, nativeSize)
|
||||||
|
|
||||||
|
val result:Dataset[Oaf] = spark.read.load(targetPath).as[Oaf]
|
||||||
|
|
||||||
|
|
||||||
|
result.map(s => s.getClass.getSimpleName).groupBy(col("value").alias("class")).agg(count("value").alias("Total")).show(false)
|
||||||
|
|
||||||
|
val t = spark.read.load(targetPath).count()
|
||||||
|
|
||||||
|
assertTrue(t >0)
|
||||||
|
|
||||||
|
|
||||||
|
spark.stop()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,8 +91,6 @@ class DataciteToOAFTest extends AbstractVocabularyTest{
|
||||||
def testMapping() :Unit = {
|
def testMapping() :Unit = {
|
||||||
val record =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/record.json")).mkString
|
val record =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/record.json")).mkString
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
||||||
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies, true )
|
val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies, true )
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue