[scala-refactor] Module dhp-aggregation:

Moved all scala source into src/main/scala and src/test/scala
This commit is contained in:
Sandro La Bruzzo 2021-12-06 11:26:36 +01:00
parent 0fa0ce33d6
commit 7af0bbd0b1
19 changed files with 39 additions and 197 deletions

View File

@ -1,69 +0,0 @@
package eu.dnetlib.dhp.actionmanager.scholix
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation, Result}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkCreateActionset {
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/generate_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val workingDirFolder = parser.get("workingDirFolder")
log.info(s"workingDirFolder -> $workingDirFolder")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val resultEncoders: Encoder[Result] = Encoders.kryo[Result]
implicit val relationEncoders: Encoder[Relation] = Encoders.kryo[Relation]
import spark.implicits._
val relation = spark.read.load(s"$sourcePath/relation").as[Relation]
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.flatMap(r => List(r.getSource, r.getTarget)).distinct().write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/id_relation")
val idRelation = spark.read.load(s"$workingDirFolder/id_relation").as[String]
log.info("extract source and target Identifier involved in relations")
log.info("save relation filtered")
relation.filter(r => (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.write.mode(SaveMode.Overwrite).save(s"$workingDirFolder/actionSetOaf")
log.info("saving entities")
val entities: Dataset[(String, Result)] = spark.read.load(s"$sourcePath/entities/*").as[Result].map(p => (p.getId, p))(Encoders.tuple(Encoders.STRING, resultEncoders))
entities
.joinWith(idRelation, entities("_1").equalTo(idRelation("value")))
.map(p => p._1._2)
.write.mode(SaveMode.Append).save(s"$workingDirFolder/actionSetOaf")
}
}

View File

@ -1,86 +0,0 @@
package eu.dnetlib.dhp.actionmanager.scholix
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Oaf, Dataset => OafDataset,Publication, Software, OtherResearchProduct, Relation}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source
object SparkSaveActionSet {
def toActionSet(item: Oaf): (String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: OafDataset =>
val a: AtomicAction[OafDataset] = new AtomicAction[OafDataset]
a.setClazz(classOf[OafDataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case software: Software =>
val a: AtomicAction[Software] = new AtomicAction[Software]
a.setClazz(classOf[Software])
a.setPayload(software)
(software.getClass.getCanonicalName, mapper.writeValueAsString(a))
case orp: OtherResearchProduct =>
val a: AtomicAction[OtherResearchProduct] = new AtomicAction[OtherResearchProduct]
a.setClazz(classOf[OtherResearchProduct])
a.setPayload(orp)
(orp.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/actionset/save_actionset.json")).mkString)
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val oafEncoders: Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val tEncoder: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
spark.read.load(sourcePath).as[Oaf]
.map(o => toActionSet(o))
.filter(o => o != null)
.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$targetPath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
}
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.datacite
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
import java.io.InputStream
import java.time.format.DateTimeFormatter

View File

@ -6,7 +6,7 @@ import eu.dnetlib.dhp.datacite.DataciteModelConstants._
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
import eu.dnetlib.dhp.schema.oaf.{AccessRight, Author, DataInfo, Instance, KeyValue, Oaf, OtherResearchProduct, Publication, Qualifier, Relation, Result, Software, StructuredProperty, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import org.json4s.DefaultFormats
@ -29,6 +29,7 @@ object DataciteToOAFTransformation {
/**
* This method should skip record if json contains invalid text
* defined in gile datacite_filter
*
* @param json
* @return True if the record should be skipped
*/
@ -107,9 +108,9 @@ object DataciteToOAFTransformation {
d
}
def fix_thai_date(input:String, format:String) :String = {
def fix_thai_date(input: String, format: String): String = {
try {
val a_date = LocalDate.parse(input,DateTimeFormatter.ofPattern(format))
val a_date = LocalDate.parse(input, DateTimeFormatter.ofPattern(format))
val d = ThaiBuddhistDate.of(a_date.getYear, a_date.getMonth.getValue, a_date.getDayOfMonth)
LocalDate.from(d).toString
} catch {
@ -236,7 +237,7 @@ object DataciteToOAFTransformation {
val p = match_pattern.get._2
val grantId = m.matcher(awardUri).replaceAll("$2")
val targetId = s"$p${DHPUtils.md5(grantId)}"
List( generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo) )
List(generateRelation(sourceId, targetId, "isProducedBy", DATACITE_COLLECTED_FROM, dataInfo))
}
else
List()
@ -335,15 +336,15 @@ object DataciteToOAFTransformation {
.map(d => d.get)
if (a_date.isDefined) {
if(doi.startsWith("10.14457"))
result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get,"[yyyy-MM-dd]"), null))
if (doi.startsWith("10.14457"))
result.setEmbargoenddate(OafMapperUtils.field(fix_thai_date(a_date.get, "[yyyy-MM-dd]"), null))
else
result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null))
}
if (i_date.isDefined && i_date.get.isDefined) {
if(doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get,"[yyyy-MM-dd]"), null))
if (doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get, "[yyyy-MM-dd]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(i_date.get.get, "[yyyy-MM-dd]"), null))
}
else {
result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
@ -351,9 +352,9 @@ object DataciteToOAFTransformation {
}
}
else if (publication_year != null) {
if(doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year","[dd-MM-yyyy]"), null))
if (doi.startsWith("10.14457")) {
result.setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year", "[dd-MM-yyyy]"), null))
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(fix_thai_date(s"01-01-$publication_year", "[dd-MM-yyyy]"), null))
} else {
result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
@ -457,7 +458,7 @@ object DataciteToOAFTransformation {
JField("relatedIdentifier", JString(relatedIdentifier)) <- relIdentifier
} yield RelatedIdentifierType(relationType, relatedIdentifier, relatedIdentifierType)
relations = relations ::: generateRelations(rels,result.getId, if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null)
relations = relations ::: generateRelations(rels, result.getId, if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null)
}
if (relations != null && relations.nonEmpty) {
List(result) ::: relations
@ -466,7 +467,7 @@ object DataciteToOAFTransformation {
List(result)
}
private def generateRelations(rels: List[RelatedIdentifierType], id:String, date:String):List[Relation] = {
private def generateRelations(rels: List[RelatedIdentifierType], id: String, date: String): List[Relation] = {
rels
.filter(r =>
subRelTypeMapping.contains(r.relationType) && (
@ -484,12 +485,12 @@ object DataciteToOAFTransformation {
rel.setSubRelType(subRelType)
rel.setRelClass(r.relationType)
val dateProps:KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
val dateProps: KeyValue = OafMapperUtils.keyValue(DATE_RELATION_KEY, date)
rel.setProperties(List(dateProps).asJava)
rel.setSource(id)
rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier,r.relatedIdentifierType))
rel.setTarget(DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier, r.relatedIdentifierType))
rel.setCollectedfrom(List(DATACITE_COLLECTED_FROM).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel
@ -504,4 +505,4 @@ object DataciteToOAFTransformation {
}
}
}

View File

@ -7,6 +7,7 @@ import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import collection.JavaConverters._
object BioDBToOAF {
case class EBILinkItem(id: Long, links: String) {}

View File

@ -1,9 +1,9 @@
package eu.dnetlib.dhp.sx.bio
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf
import BioDBToOAF.ScholixResolved
import eu.dnetlib.dhp.collection.CollectionUtils
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -36,13 +36,13 @@ object SparkTransformBioDatabaseToOAF {
import spark.implicits._
database.toUpperCase() match {
case "UNIPROT" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "PDB" =>
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "SCHOLIX" =>
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
case "CROSSREF_LINKS" =>
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))).flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null).write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.Result
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PubMedToOaf}
import eu.dnetlib.dhp.sx.bio.pubmed._
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration

View File

@ -1,9 +1,8 @@
package eu.dnetlib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal
import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils
import org.apache.http.client.config.RequestConfig
import org.apache.http.client.methods.HttpGet

View File

@ -1,11 +1,10 @@
package eu.dnetlib.dhp.sx.bio.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.collection.CollectionUtils
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
import BioDBToOAF.EBILinkItem
import eu.dnetlib.dhp.collection.CollectionUtils
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -38,7 +37,7 @@ object SparkEBILinksToOaf {
ebLinks.flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
.filter(p => BioDBToOAF.EBITargetLinksFilter(p))
.flatMap(p => BioDBToOAF.convertEBILinksToOaf(p))
.flatMap(i=> CollectionUtils.fixRelations(i)).filter(i => i != null)
.flatMap(i => CollectionUtils.fixRelations(i)).filter(i => i != null)
.write.mode(SaveMode.Overwrite).save(targetPath)
}
}

View File

@ -4,7 +4,7 @@ import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.{GraphCleaningFunctions, IdentifierFactory, OafMapperUtils, PidType}
import eu.dnetlib.dhp.schema.oaf._
import scala.collection.JavaConverters._
import collection.JavaConverters._
import java.util.regex.Pattern
@ -22,10 +22,10 @@ object PubMedToOaf {
val collectedFrom: KeyValue = OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
/**
* Cleaning the DOI Applying regex in order to
* remove doi starting with URL
*
* @param doi input DOI
* @return cleaned DOI
*/
@ -49,7 +49,7 @@ object PubMedToOaf {
* starting from OAF instanceType value
*
* @param cobjQualifier OAF instance type
* @param vocabularies All dnet vocabularies
* @param vocabularies All dnet vocabularies
* @return the correct instance
*/
def createResult(cobjQualifier: Qualifier, vocabularies: VocabularyGroup): Result = {
@ -65,7 +65,7 @@ object PubMedToOaf {
}
/**
* Mapping the Pubmedjournal info into the OAF Journale
* Mapping the Pubmedjournal info into the OAF Journale
*
* @param j the pubmedJournal
* @return the OAF Journal
@ -91,9 +91,8 @@ object PubMedToOaf {
* Find vocabulary term into synonyms and term in the vocabulary
*
* @param vocabularyName the input vocabulary name
* @param vocabularies all the vocabularies
* @param term the term to find
*
* @param vocabularies all the vocabularies
* @param term the term to find
* @return the cleaned term value
*/
def getVocabularyTerm(vocabularyName: String, vocabularies: VocabularyGroup, term: String): Qualifier = {
@ -104,10 +103,9 @@ object PubMedToOaf {
/**
* Map the Pubmed Article into the OAF instance
* Map the Pubmed Article into the OAF instance
*
*
* @param article the pubmed articles
* @param article the pubmed articles
* @param vocabularies the vocabularies
* @return The OAF instance if the mapping did not fail
*/
@ -185,7 +183,6 @@ object PubMedToOaf {
//--------------------------------------------------------------------------------------
// RESULT MAPPING
//--------------------------------------------------------------------------------------
result.setDateofacceptance(OafMapperUtils.field(GraphCleaningFunctions.cleanDate(article.getDate), dataInfo))

View File

@ -8,6 +8,7 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, count}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
@ -17,7 +18,6 @@ import java.nio.file.{Files, Path}
import java.text.SimpleDateFormat
import java.util.Locale
import scala.io.Source
import org.junit.jupiter.api.Assertions._
@ExtendWith(Array(classOf[MockitoExtension]))
class DataciteToOAFTest extends AbstractVocabularyTest{