From 2164a2a8898da176799bab3815244b234746eb45 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 25 Nov 2021 10:54:13 +0100 Subject: [PATCH] Datacite: Code Refactor generated a general SparkApplication Scala where all the spark scala have to inherit Commented a little the Datacite transformation code --- .../AbstractScalaApplication.scala | 37 +++++ .../application/SparkScalaApplication.scala | 35 +++++ .../dhp/datacite/DataciteModelConstants.scala | 134 ++++++++++++++++++ .../DataciteToOAFTransformation.scala | 127 +++-------------- .../GenerateDataciteDatasetSpark.scala | 86 +++++++---- .../src/main/resources/log4j.properties | 3 + .../dhp/datacite/DataciteToOAFTest.scala | 57 +++++++- 7 files changed, 337 insertions(+), 142 deletions(-) create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/application/AbstractScalaApplication.scala create mode 100644 dhp-common/src/main/java/eu/dnetlib/dhp/application/SparkScalaApplication.scala create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/AbstractScalaApplication.scala b/dhp-common/src/main/java/eu/dnetlib/dhp/application/AbstractScalaApplication.scala new file mode 100644 index 000000000..44dad93eb --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/AbstractScalaApplication.scala @@ -0,0 +1,37 @@ +package eu.dnetlib.dhp.application + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.slf4j.Logger + +abstract class AbstractScalaApplication (val propertyPath:String, val args:Array[String], log:Logger) extends SparkScalaApplication { + + var parser: ArgumentApplicationParser = null + + var spark:SparkSession = null + + + def initialize():SparkScalaApplication = { + parser = parseArguments(args) + spark = createSparkSession() + this + } + + /** + * Utility for creating a spark session starting from parser + * + * @return a spark Session + */ + private def createSparkSession():SparkSession = { + require(parser!= null) + + val conf:SparkConf = new SparkConf() + val master = parser.get("master") + log.info(s"Creating Spark session: Master: $master") + SparkSession.builder().config(conf) + .appName(getClass.getSimpleName) + .master(master) + .getOrCreate() + } + +} \ No newline at end of file diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/SparkScalaApplication.scala b/dhp-common/src/main/java/eu/dnetlib/dhp/application/SparkScalaApplication.scala new file mode 100644 index 000000000..247bacac0 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/SparkScalaApplication.scala @@ -0,0 +1,35 @@ +package eu.dnetlib.dhp.application + +import scala.io.Source + +/** + * This is the main Interface SparkApplication + * where all the Spark Scala class should inherit + * + */ +trait SparkScalaApplication { + /** + * This is the path in the classpath of the json + * describes all the argument needed to run + */ + val propertyPath: String + + /** + * Utility to parse the arguments using the + * property json in the classpath identified from + * the variable propertyPath + * + * @param args the list of arguments + */ + def parseArguments(args: Array[String]): ArgumentApplicationParser = { + val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream(propertyPath)).mkString) + parser.parseArgument(args) + parser + } + + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + def run(): Unit +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala new file mode 100644 index 000000000..0685a04a3 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteModelConstants.scala @@ -0,0 +1,134 @@ +package eu.dnetlib.dhp.datacite + +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils + +import java.io.InputStream +import java.time.format.DateTimeFormatter +import java.util.Locale +import java.util.regex.Pattern +import scala.io.Source + +/** + * This class represent the dataModel of the input Dataset of Datacite + * @param doi THE DOI + * @param timestamp timestamp of last update date + * @param isActive the record is active or deleted + * @param json the json native records + */ +case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {} + +/* + The following class are utility class used for the mapping from + json datacite to OAF Shema + */ +case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {} + +case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {} + +case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {} + +case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {} + +case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {} + +case class DescriptionType(descriptionType: Option[String], description: Option[String]) {} + +case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {} + +case class DateType(date: Option[String], dateType: Option[String]) {} + +case class OAFRelations(relation:String, inverse:String, relType:String) + + +class DataciteModelConstants extends Serializable { + +} + +object DataciteModelConstants { + + val REL_TYPE_VALUE:String = "resultResult" + val DATE_RELATION_KEY = "RelationDate" + val DATACITE_FILTER_PATH = "/eu/dnetlib/dhp/datacite/datacite_filter" + val DOI_CLASS = "doi" + val SUBJ_CLASS = "keywords" + val DATACITE_NAME = "Datacite" + val dataInfo: DataInfo = dataciteDataInfo("0.9") + val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, DATACITE_NAME) + + val subRelTypeMapping: Map[String,OAFRelations] = Map( + ModelConstants.REFERENCES -> OAFRelations(ModelConstants.REFERENCES, ModelConstants.IS_REFERENCED_BY, ModelConstants.RELATIONSHIP), + ModelConstants.IS_REFERENCED_BY -> OAFRelations(ModelConstants.IS_REFERENCED_BY,ModelConstants.REFERENCES, ModelConstants.RELATIONSHIP), + + ModelConstants.IS_SUPPLEMENTED_BY -> OAFRelations(ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.SUPPLEMENT), + ModelConstants.IS_SUPPLEMENT_TO -> OAFRelations(ModelConstants.IS_SUPPLEMENT_TO,ModelConstants.IS_SUPPLEMENTED_BY,ModelConstants.SUPPLEMENT), + + ModelConstants.HAS_PART -> OAFRelations(ModelConstants.HAS_PART,ModelConstants.IS_PART_OF, ModelConstants.PART), + ModelConstants.IS_PART_OF -> OAFRelations(ModelConstants.IS_PART_OF,ModelConstants.HAS_PART, ModelConstants.PART), + + ModelConstants.IS_VERSION_OF-> OAFRelations(ModelConstants.IS_VERSION_OF,ModelConstants.HAS_VERSION,ModelConstants.VERSION), + ModelConstants.HAS_VERSION-> OAFRelations(ModelConstants.HAS_VERSION,ModelConstants.IS_VERSION_OF,ModelConstants.VERSION), + + ModelConstants.IS_IDENTICAL_TO -> OAFRelations(ModelConstants.IS_IDENTICAL_TO,ModelConstants.IS_IDENTICAL_TO, ModelConstants.RELATIONSHIP), + + ModelConstants.IS_CONTINUED_BY -> OAFRelations(ModelConstants.IS_CONTINUED_BY,ModelConstants.CONTINUES, ModelConstants.RELATIONSHIP), + ModelConstants.CONTINUES -> OAFRelations(ModelConstants.CONTINUES,ModelConstants.IS_CONTINUED_BY, ModelConstants.RELATIONSHIP), + + ModelConstants.IS_NEW_VERSION_OF-> OAFRelations(ModelConstants.IS_NEW_VERSION_OF,ModelConstants.IS_PREVIOUS_VERSION_OF, ModelConstants.VERSION), + ModelConstants.IS_PREVIOUS_VERSION_OF ->OAFRelations(ModelConstants.IS_PREVIOUS_VERSION_OF,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION), + + ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP), + ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP), + + ModelConstants.IS_SOURCE_OF -> OAFRelations(ModelConstants.IS_SOURCE_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION), + ModelConstants.IS_DERIVED_FROM -> OAFRelations(ModelConstants.IS_DERIVED_FROM,ModelConstants.IS_SOURCE_OF, ModelConstants.VERSION), + + ModelConstants.CITES -> OAFRelations(ModelConstants.CITES,ModelConstants.IS_CITED_BY, ModelConstants.CITATION), + ModelConstants.IS_CITED_BY -> OAFRelations(ModelConstants.IS_CITED_BY,ModelConstants.CITES, ModelConstants.CITATION), + + ModelConstants.IS_VARIANT_FORM_OF -> OAFRelations(ModelConstants.IS_VARIANT_FORM_OF,ModelConstants.IS_DERIVED_FROM, ModelConstants.VERSION), + ModelConstants.IS_OBSOLETED_BY -> OAFRelations(ModelConstants.IS_OBSOLETED_BY,ModelConstants.IS_NEW_VERSION_OF, ModelConstants.VERSION), + + ModelConstants.REVIEWS -> OAFRelations(ModelConstants.REVIEWS,ModelConstants.IS_REVIEWED_BY, ModelConstants.REVIEW), + ModelConstants.IS_REVIEWED_BY -> OAFRelations(ModelConstants.IS_REVIEWED_BY,ModelConstants.REVIEWS, ModelConstants.REVIEW), + + ModelConstants.DOCUMENTS -> OAFRelations(ModelConstants.DOCUMENTS,ModelConstants.IS_DOCUMENTED_BY, ModelConstants.RELATIONSHIP), + ModelConstants.IS_DOCUMENTED_BY -> OAFRelations(ModelConstants.IS_DOCUMENTED_BY,ModelConstants.DOCUMENTS, ModelConstants.RELATIONSHIP), + + ModelConstants.COMPILES -> OAFRelations(ModelConstants.COMPILES,ModelConstants.IS_COMPILED_BY, ModelConstants.RELATIONSHIP), + ModelConstants.IS_COMPILED_BY -> OAFRelations(ModelConstants.IS_COMPILED_BY,ModelConstants.COMPILES, ModelConstants.RELATIONSHIP) + ) + + + val datacite_filter: List[String] = { + val stream: InputStream = getClass.getResourceAsStream(DATACITE_FILTER_PATH) + require(stream!= null) + Source.fromInputStream(stream).getLines().toList + } + + + def dataciteDataInfo(trust: String): DataInfo = OafMapperUtils.dataInfo(false,null, false, false, ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, trust) + + val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH) + val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN) + + val funder_regex: List[(Pattern, String)] = List( + (Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"), + (Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::") + + ) + + val Date_regex: List[Pattern] = List( + //Y-M-D + Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE), + //M-D-Y + Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE), + //D-M-Y + Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE), + //Y + Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE) + ) + + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala index 1af72e8d3..6b4deef0a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/DataciteToOAFTransformation.scala @@ -2,6 +2,7 @@ package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup +import eu.dnetlib.dhp.datacite.DataciteModelConstants._ import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} @@ -12,115 +13,30 @@ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse -import java.nio.charset.CodingErrorAction import java.text.SimpleDateFormat import java.time.LocalDate import java.time.chrono.ThaiBuddhistDate import java.time.format.DateTimeFormatter -import java.util.regex.Pattern import java.util.{Date, Locale} import scala.collection.JavaConverters._ -import scala.io.{Codec, Source} -import scala.language.postfixOps -case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {} - -case class RelatedIdentifierType(relationType: String, relatedIdentifier: String, relatedIdentifierType: String) {} - -case class NameIdentifiersType(nameIdentifierScheme: Option[String], schemeUri: Option[String], nameIdentifier: Option[String]) {} - -case class CreatorType(nameType: Option[String], nameIdentifiers: Option[List[NameIdentifiersType]], name: Option[String], familyName: Option[String], givenName: Option[String], affiliation: Option[List[String]]) {} - -case class TitleType(title: Option[String], titleType: Option[String], lang: Option[String]) {} - -case class SubjectType(subject: Option[String], subjectScheme: Option[String]) {} - -case class DescriptionType(descriptionType: Option[String], description: Option[String]) {} - -case class FundingReferenceType(funderIdentifierType: Option[String], awardTitle: Option[String], awardUri: Option[String], funderName: Option[String], funderIdentifier: Option[String], awardNumber: Option[String]) {} - -case class DateType(date: Option[String], dateType: Option[String]) {} - -//case class HostedByMapType(openaire_id: String, datacite_name: String, official_name: String, similarity: Option[Float]) {} object DataciteToOAFTransformation { - val REL_TYPE_VALUE:String = "resultResult" - val DATE_RELATION_KEY = "RelationDate" - - val subRelTypeMapping: Map[String,(String,String)] = Map( - "References" ->("IsReferencedBy","relationship"), - "IsSupplementTo" ->("IsSupplementedBy","supplement"), - "IsPartOf" ->("HasPart","part"), - "HasPart" ->("IsPartOf","part"), - "IsVersionOf" ->("HasVersion","version"), - "HasVersion" ->("IsVersionOf","version"), - "IsIdenticalTo" ->("IsIdenticalTo","relationship"), - "IsPreviousVersionOf" ->("IsNewVersionOf","version"), - "IsContinuedBy" ->("Continues","relationship"), - "Continues" ->("IsContinuedBy","relationship"), - "IsNewVersionOf" ->("IsPreviousVersionOf","version"), - "IsSupplementedBy" ->("IsSupplementTo","supplement"), - "IsDocumentedBy" ->("Documents","relationship"), - "IsSourceOf" ->("IsDerivedFrom","relationship"), - "Cites" ->("IsCitedBy","citation"), - "IsCitedBy" ->("Cites","citation"), - "IsDerivedFrom" ->("IsSourceOf","relationship"), - "IsVariantFormOf" ->("IsDerivedFrom","version"), - "IsReferencedBy" ->("References","relationship"), - "IsObsoletedBy" ->("IsNewVersionOf","version"), - "Reviews" ->("IsReviewedBy","review"), - "Documents" ->("IsDocumentedBy","relationship"), - "IsCompiledBy" ->("Compiles","relationship"), - "Compiles" ->("IsCompiledBy","relationship"), - "IsReviewedBy" ->("Reviews","review") - ) - - implicit val codec: Codec = Codec("UTF-8") - codec.onMalformedInput(CodingErrorAction.REPLACE) - codec.onUnmappableCharacter(CodingErrorAction.REPLACE) - - val DOI_CLASS = "doi" - val SUBJ_CLASS = "keywords" - - - val j_filter: List[String] = { - val s = Source.fromInputStream(getClass.getResourceAsStream("datacite_filter")).mkString - s.lines.toList - } - val mapper = new ObjectMapper() - val dataInfo: DataInfo = generateDataInfo("0.9") - val DATACITE_COLLECTED_FROM: KeyValue = OafMapperUtils.keyValue(ModelConstants.DATACITE_ID, "Datacite") - - - val df_en: DateTimeFormatter = DateTimeFormatter.ofPattern("[MM-dd-yyyy][MM/dd/yyyy][dd-MM-yy][dd-MMM-yyyy][dd/MMM/yyyy][dd-MMM-yy][dd/MMM/yy][dd-MM-yy][dd/MM/yy][dd-MM-yyyy][dd/MM/yyyy][yyyy-MM-dd][yyyy/MM/dd]", Locale.ENGLISH) - val df_it: DateTimeFormatter = DateTimeFormatter.ofPattern("[dd-MM-yyyy][dd/MM/yyyy]", Locale.ITALIAN) - - val funder_regex: List[(Pattern, String)] = List( - (Pattern.compile("(info:eu-repo/grantagreement/ec/h2020/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda__h2020::"), - (Pattern.compile("(info:eu-repo/grantagreement/ec/fp7/)(\\d\\d\\d\\d\\d\\d)(.*)", Pattern.MULTILINE | Pattern.CASE_INSENSITIVE), "40|corda_______::") - - ) - - val Date_regex: List[Pattern] = List( - //Y-M-D - Pattern.compile("(18|19|20)\\d\\d([- /.])(0[1-9]|1[012])\\2(0[1-9]|[12][0-9]|3[01])", Pattern.MULTILINE), - //M-D-Y - Pattern.compile("((0[1-9]|1[012])|([1-9]))([- /.])(0[1-9]|[12][0-9]|3[01])([- /.])(18|19|20)?\\d\\d", Pattern.MULTILINE), - //D-M-Y - Pattern.compile("(?:(?:31(/|-|\\.)(?:0?[13578]|1[02]|(?:Jan|Mar|May|Jul|Aug|Oct|Dec)))\\1|(?:(?:29|30)(/|-|\\.)(?:0?[1,3-9]|1[0-2]|(?:Jan|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec))\\2))(?:(?:1[6-9]|[2-9]\\d)?\\d{2})|(?:29(/|-|\\.)(?:0?2|(?:Feb))\\3(?:(?:(?:1[6-9]|[2-9]\\d)?(?:0[48]|[2468][048]|[13579][26])|(?:(?:16|[2468][048]|[3579][26])00))))|(?:0?[1-9]|1\\d|2[0-8])(/|-|\\.)(?:(?:0?[1-9]|(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep))|(?:1[0-2]|(?:Oct|Nov|Dec)))\\4(?:(?:1[6-9]|[2-9]\\d)?\\d{2})", Pattern.MULTILINE), - //Y - Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE) - ) - - - def filter_json(json: String): Boolean = { - j_filter.exists(f => json.contains(f)) + /** + * This method should skip record if json contains invalid text + * defined in gile datacite_filter + * @param json + * @return True if the record should be skipped + */ + def skip_record(json: String): Boolean = { + datacite_filter.exists(f => json.contains(f)) } + @deprecated("this method will be removed", "dhp") def toActionSet(item: Oaf): (String, String) = { val mapper = new ObjectMapper() @@ -200,6 +116,8 @@ object DataciteToOAFTransformation { case _: Throwable => "" } } + + def getTypeQualifier(resourceType: String, resourceTypeGeneral: String, schemaOrg: String, vocabularies: VocabularyGroup): (Qualifier, Qualifier) = { if (resourceType != null && resourceType.nonEmpty) { val typeQualifier = vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType) @@ -318,11 +236,7 @@ object DataciteToOAFTransformation { val p = match_pattern.get._2 val grantId = m.matcher(awardUri).replaceAll("$2") val targetId = s"$p${DHPUtils.md5(grantId)}" - List( - generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) -// REMOVED INVERSE RELATION since there is a specific method that should generate later -// generateRelation(targetId, sourceId, "produces", DATACITE_COLLECTED_FROM, dataInfo) - ) + List( generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) ) } else List() @@ -331,7 +245,7 @@ object DataciteToOAFTransformation { def generateOAF(input: String, ts: Long, dateOfCollection: Long, vocabularies: VocabularyGroup, exportLinks: Boolean): List[Oaf] = { - if (filter_json(input)) + if (skip_record(input)) return List() implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -565,7 +479,7 @@ object DataciteToOAFTransformation { rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) rel.setDataInfo(dataInfo) - val subRelType = subRelTypeMapping(r.relationType)._2 + val subRelType = subRelTypeMapping(r.relationType).relType rel.setRelType(REL_TYPE_VALUE) rel.setSubRelType(subRelType) rel.setRelClass(r.relationType) @@ -579,18 +493,9 @@ object DataciteToOAFTransformation { rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava) rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel - }).toList + }) } - def generateDataInfo(trust: String): DataInfo = { - val di = new DataInfo - di.setDeletedbyinference(false) - di.setInferred(false) - di.setInvisible(false) - di.setTrust(trust) - di.setProvenanceaction(ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER) - di - } def generateDSId(input: String): String = { val b = StringUtils.substringBefore(input, "::") diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index a63627d1c..e1607ee9c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -1,64 +1,94 @@ package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.collection.CollectionUtils.fixRelations -import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH -import eu.dnetlib.dhp.common.Constants.MDSTORE_SIZE_PATH +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.mdstore.{MDStoreVersion, MetadataRecord} import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile import eu.dnetlib.dhp.utils.ISLookupClientFactory -import org.apache.spark.SparkConf import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} -import scala.io.Source -object GenerateDataciteDatasetSpark { +class GenerateDataciteDatasetSpark (propertyPath:String, args:Array[String], log:Logger) extends AbstractScalaApplication(propertyPath, args, log:Logger) { + /** + * Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { - val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass) - - def main(args: Array[String]): Unit = { - val conf = new SparkConf - val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/datacite/generate_dataset_params.json")).mkString) - parser.parseArgument(args) - val master = parser.get("master") val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) + log.info(s"exportLinks is '$exportLinks'") val isLookupUrl: String = parser.get("isLookupUrl") log.info("isLookupUrl: {}", isLookupUrl) val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) - val spark: SparkSession = SparkSession.builder().config(conf) - .appName(GenerateDataciteDatasetSpark.getClass.getSimpleName) - .master(master) - .getOrCreate() + require(vocabularies != null) + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" + log.info(s"targetPath is '$targetPath'") + + generateDataciteDataset(sourcePath, exportLinks, vocabularies, targetPath, spark) + + reportTotalSize(targetPath, outputBasePath) + } + + + /** + * For working with MDStore we need to store in a file on hdfs the size of + * the current dataset + * @param targetPath + * @param outputBasePath + */ + def reportTotalSize( targetPath: String, outputBasePath: String ):Unit = { + val total_items = spark.read.load(targetPath).count() + writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) + } + + /** + * Generate the transformed and cleaned OAF Dataset from the native one + + * @param sourcePath sourcePath of the native Dataset in format JSON/Datacite + * @param exportLinks If true it generates unresolved links + * @param vocabularies vocabularies for cleaning + * @param targetPath the targetPath of the result Dataset + */ + def generateDataciteDataset(sourcePath: String, exportLinks: Boolean, vocabularies: VocabularyGroup, targetPath: String, spark:SparkSession):Unit = { + require(spark!= null) import spark.implicits._ implicit val mrEncoder: Encoder[MetadataRecord] = Encoders.kryo[MetadataRecord] implicit val resEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] - - val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") - val mapper = new ObjectMapper() - val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) - val outputBasePath = cleanedMdStoreVersion.getHdfsPath - - log.info("outputBasePath: {}", outputBasePath) - val targetPath = s"$outputBasePath/$MDSTORE_DATA_PATH" - spark.read.load(sourcePath).as[DataciteType] .filter(d => d.isActive) .flatMap(d => DataciteToOAFTransformation.generateOAF(d.json, d.timestamp, d.timestamp, vocabularies, exportLinks)) .filter(d => d != null) .flatMap(i => fixRelations(i)).filter(i => i != null) .write.mode(SaveMode.Overwrite).save(targetPath) + } - val total_items = spark.read.load(targetPath).as[Oaf].count() - writeHdfsFile(spark.sparkContext.hadoopConfiguration, s"$total_items", outputBasePath + MDSTORE_SIZE_PATH) +} + + +object GenerateDataciteDatasetSpark { + + val log: Logger = LoggerFactory.getLogger(GenerateDataciteDatasetSpark.getClass) + + def main(args: Array[String]): Unit = { + new GenerateDataciteDatasetSpark("/eu/dnetlib/dhp/datacite/generate_dataset_params.json", args, log).initialize().run() } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties index 63cba917e..81458d1f7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/log4j.properties @@ -7,3 +7,6 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.spark=FATAL +log4j.logger.org.spark_project=FATAL diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala index c7c6c6a92..50fe73d9a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala @@ -4,21 +4,29 @@ package eu.dnetlib.dhp.datacite import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest import eu.dnetlib.dhp.schema.oaf.Oaf +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.{col, count} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{BeforeEach, Test} import org.mockito.junit.jupiter.MockitoExtension +import org.slf4j.{Logger, LoggerFactory} +import java.nio.file.{Files, Path} import java.text.SimpleDateFormat import java.util.Locale import scala.io.Source - +import org.junit.jupiter.api.Assertions._ @ExtendWith(Array(classOf[MockitoExtension])) class DataciteToOAFTest extends AbstractVocabularyTest{ + var workingDir:Path= null + val log: Logger = LoggerFactory.getLogger(getClass) @BeforeEach def setUp() :Unit = { + workingDir= Files.createTempDirectory(getClass.getSimpleName) super.setUpVocabulary() } @@ -31,6 +39,51 @@ class DataciteToOAFTest extends AbstractVocabularyTest{ println(dt.getTime) + } + + + @Test + def testConvert(): Unit = { + + + val path = getClass.getResource("/eu/dnetlib/dhp/actionmanager/datacite/dataset").getPath + + val conf = new SparkConf() + val spark:SparkSession = SparkSession.builder().config(conf) + .appName(getClass.getSimpleName) + .master("local[*]") + .getOrCreate() + + + + implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] + val instance = new GenerateDataciteDatasetSpark(null, null, log) + val targetPath = s"$workingDir/result" + + instance.generateDataciteDataset(path, exportLinks = true, vocabularies,targetPath, spark) + + import spark.implicits._ + + val nativeSize =spark.read.load(path).count() + + + assertEquals(100, nativeSize) + + val result:Dataset[Oaf] = spark.read.load(targetPath).as[Oaf] + + + result.map(s => s.getClass.getSimpleName).groupBy(col("value").alias("class")).agg(count("value").alias("Total")).show(false) + + val t = spark.read.load(targetPath).count() + + assertTrue(t >0) + + + spark.stop() + + + + } @@ -38,8 +91,6 @@ class DataciteToOAFTest extends AbstractVocabularyTest{ def testMapping() :Unit = { val record =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/record.json")).mkString - - val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) val res:List[Oaf] =DataciteToOAFTransformation.generateOAF(record, 0L,0L, vocabularies, true )