[scala-refactor] Module dhp-doiboost:

Moved all scala source into src/main/scala and src/test/scala
This commit is contained in:
Sandro La Bruzzo 2021-12-06 14:24:03 +01:00
parent bf880e2508
commit e9f285ec4d
23 changed files with 181 additions and 316 deletions

View File

@ -1,21 +1,19 @@
package eu.dnetlib.doiboost package eu.dnetlib.doiboost
import java.time.LocalDate import com.fasterxml.jackson.databind.ObjectMapper
import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{AccessRight, DataInfo, Dataset, Field, Instance, KeyValue, Oaf, OpenAccessRoute, Organization, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{getClosedAccessQualifier, getEmbargoedAccessQualifier, getUnknownQualifier}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._

View File

@ -8,11 +8,12 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkGenerateDOIBoostActionSet { object SparkGenerateDOIBoostActionSet {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
@ -33,53 +34,41 @@ object SparkGenerateDOIBoostActionSet {
implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]]
val dbPublicationPath = parser.get("dbPublicationPath") val dbPublicationPath = parser.get("dbPublicationPath")
val dbDatasetPath = parser.get("dbDatasetPath") val dbDatasetPath = parser.get("dbDatasetPath")
val crossRefRelation = parser.get("crossRefRelation") val crossRefRelation = parser.get("crossRefRelation")
val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
val dbOrganizationPath = parser.get("dbOrganizationPath") val dbOrganizationPath = parser.get("dbOrganizationPath")
val sequenceFilePath = parser.get("sFilePath") val sequenceFilePath = parser.get("sFilePath")
val asDataset = spark.read.load(dbDatasetPath).as[OafDataset] val asDataset = spark.read.load(dbDatasetPath).as[OafDataset]
.filter(p => p != null || p.getId != null) .filter(p => p != null || p.getId != null)
.map(d =>DoiBoostMappingUtil.fixResult(d)) .map(d => DoiBoostMappingUtil.fixResult(d))
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
val asPublication =spark.read.load(dbPublicationPath).as[Publication] val asPublication = spark.read.load(dbPublicationPath).as[Publication]
.filter(p => p != null || p.getId != null) .filter(p => p != null || p.getId != null)
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
val asOrganization = spark.read.load(dbOrganizationPath).as[Organization] val asOrganization = spark.read.load(dbOrganizationPath).as[Organization]
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
val asCRelation = spark.read.load(crossRefRelation).as[Relation] val asCRelation = spark.read.load(crossRefRelation).as[Relation]
.filter(r => r!= null && r.getSource != null && r.getTarget != null) .filter(r => r != null && r.getSource != null && r.getTarget != null)
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation] val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation]
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .map(d => DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation) val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation)
d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text, Text]], classOf[GzipCodec])
d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
} }

View File

@ -9,28 +9,26 @@ import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JField, JObject, JString,JArray} import org.json4s.JsonAST.{JField, JObject, JString}
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkGenerateDoiBoost { object SparkGenerateDoiBoost {
def extractIdGRID(input:String):List[(String,String)] = { def extractIdGRID(input: String): List[(String, String)] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: org.json4s.JValue = parse(input) lazy val json: org.json4s.JValue = parse(input)
val id:String = (json \ "id").extract[String] val id: String = (json \ "id").extract[String]
val grids:List[String] = for { val grids: List[String] = for {
JObject(pid) <- json \ "pid" JObject(pid) <- json \ "pid"
JField("qualifier", JObject(qualifier)) <- pid JField("qualifier", JObject(qualifier)) <- pid
JField("classid", JString(classid)) <-qualifier JField("classid", JString(classid)) <- qualifier
JField("value", JString(vl)) <- pid JField("value", JString(vl)) <- pid
if classid == "GRID" if classid == "GRID"
} yield vl } yield vl
@ -38,7 +36,6 @@ object SparkGenerateDoiBoost {
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass) val logger: Logger = LoggerFactory.getLogger(getClass)
@ -73,7 +70,7 @@ object SparkGenerateDoiBoost {
if (a != null && a._2 != null) { if (a != null && a._2 != null) {
b.mergeFrom(a._2) b.mergeFrom(a._2)
b.setId(a._1) b.setId(a._1)
val authors =AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor) val authors = AuthorMerger.mergeAuthor(b.getAuthor, a._2.getAuthor)
b.setAuthor(authors) b.setAuthor(authors)
return b return b
} }
@ -87,11 +84,11 @@ object SparkGenerateDoiBoost {
return b2 return b2
} }
else { else {
if (b2 != null ) { if (b2 != null) {
b1.mergeFrom(b2) b1.mergeFrom(b2)
val authors =AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor) val authors = AuthorMerger.mergeAuthor(b1.getAuthor, b2.getAuthor)
b1.setAuthor(authors) b1.setAuthor(authors)
if (b2.getId!= null && b2.getId.nonEmpty) if (b2.getId != null && b2.getId.nonEmpty)
b1.setId(b2.getId) b1.setId(b2.getId)
return b1 return b1
} }
@ -118,10 +115,9 @@ object SparkGenerateDoiBoost {
val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p)) val crossrefPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/crossrefPublication").as[Publication].map(p => (p.getId, p))
val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p)) val uwPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/uwPublication").as[Publication].map(p => (p.getId, p))
def applyMerge(item:((String, Publication), (String, Publication))) : Publication = def applyMerge(item: ((String, Publication), (String, Publication))): Publication = {
{
val crossrefPub = item._1._2 val crossrefPub = item._1._2
if (item._2!= null) { if (item._2 != null) {
val otherPub = item._2._2 val otherPub = item._2._2
if (otherPub != null) { if (otherPub != null) {
crossrefPub.mergeFrom(otherPub) crossrefPub.mergeFrom(otherPub)
@ -130,6 +126,7 @@ object SparkGenerateDoiBoost {
} }
crossrefPub crossrefPub
} }
crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin") crossrefPublication.joinWith(uwPublication, crossrefPublication("_1").equalTo(uwPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/firstJoin")
logger.info("Phase 3) Join Result with ORCID") logger.info("Phase 3) Join Result with ORCID")
val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p)) val fj: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/firstJoin").as[Publication].map(p => (p.getId, p))
@ -143,9 +140,9 @@ object SparkGenerateDoiBoost {
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication") sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
val doiBoostPublication: Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder) val doiBoostPublication: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p => DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder)
val hostedByDataset : Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem)) val hostedByDataset: Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem))
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left") doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
@ -164,21 +161,20 @@ object SparkGenerateDoiBoost {
val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId")) val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId"))
val a:Dataset[DoiBoostAffiliation] = paperAffiliation val a: Dataset[DoiBoostAffiliation] = paperAffiliation
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))) .joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId")))
.select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation] .select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation]
val magPubs: Dataset[(String, Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication]
val magPubs:Dataset[(String,Publication)]= spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s => s._1 != null)
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s =>s._1!= null )
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => { magPubs.joinWith(a, magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
val pub:Publication = item._1._2 val pub: Publication = item._1._2
val affiliation = item._2 val affiliation = item._2
val affId:String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) val affId: String = if (affiliation.GridId.isDefined) s"unresolved::grid::${affiliation.GridId.get.toLowerCase}" else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
val r:Relation = new Relation val r: Relation = new Relation
r.setSource(pub.getId) r.setSource(pub.getId)
r.setTarget(affId) r.setTarget(affId)
r.setRelType(ModelConstants.RESULT_ORGANIZATION) r.setRelType(ModelConstants.RESULT_ORGANIZATION)
@ -186,7 +182,7 @@ object SparkGenerateDoiBoost {
r.setSubRelType(ModelConstants.AFFILIATION) r.setSubRelType(ModelConstants.AFFILIATION)
r.setDataInfo(pub.getDataInfo) r.setDataInfo(pub.getDataInfo)
r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
val r1:Relation = new Relation val r1: Relation = new Relation
r1.setTarget(pub.getId) r1.setTarget(pub.getId)
r1.setSource(affId) r1.setSource(affId)
r1.setRelType(ModelConstants.RESULT_ORGANIZATION) r1.setRelType(ModelConstants.RESULT_ORGANIZATION)
@ -198,33 +194,31 @@ object SparkGenerateDoiBoost {
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved") })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved")
val unresolvedRels: Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => {
val unresolvedRels:Dataset[(String, Relation)] = spark.read.load(s"$workingDirPath/doiBoostPublicationAffiliation_unresolved").as[Relation].map(r => {
if (r.getSource.startsWith("unresolved")) if (r.getSource.startsWith("unresolved"))
(r.getSource, r) (r.getSource, r)
else if (r.getTarget.startsWith("unresolved")) else if (r.getTarget.startsWith("unresolved"))
(r.getTarget,r) (r.getTarget, r)
else else
("resolved", r) ("resolved", r)
})(Encoders.tuple(Encoders.STRING, mapEncoderRel)) })(Encoders.tuple(Encoders.STRING, mapEncoderRel))
val openaireOrganization:Dataset[(String,String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x,y) => if (x != null) x else y ).map(_._2) val openaireOrganization: Dataset[(String, String)] = spark.read.text(openaireOrganizationPath).as[String].flatMap(s => extractIdGRID(s)).groupByKey(_._2).reduceGroups((x, y) => if (x != null) x else y).map(_._2)
unresolvedRels.joinWith(openaireOrganization,unresolvedRels("_1").equalTo(openaireOrganization("_2"))) unresolvedRels.joinWith(openaireOrganization, unresolvedRels("_1").equalTo(openaireOrganization("_2")))
.map { x => .map { x =>
val currentRels = x._1._2 val currentRels = x._1._2
val currentOrgs = x._2 val currentOrgs = x._2
if (currentOrgs!= null) if (currentOrgs != null)
if(currentRels.getSource.startsWith("unresolved")) if (currentRels.getSource.startsWith("unresolved"))
currentRels.setSource(currentOrgs._1) currentRels.setSource(currentOrgs._1)
else else
currentRels.setTarget(currentOrgs._1) currentRels.setTarget(currentOrgs._1)
currentRels currentRels
}.filter(r=> !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") }.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved")).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => { magPubs.joinWith(a, magPubs("_1").equalTo(a("PaperId"))).map(item => {
val affiliation = item._2 val affiliation = item._2
if (affiliation.GridId.isEmpty) { if (affiliation.GridId.isEmpty) {
val o = new Organization val o = new Organization
@ -241,7 +235,7 @@ object SparkGenerateDoiBoost {
} }
else else
null null
}).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") }).filter(o => o != null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization")
} }
} }

View File

@ -4,20 +4,19 @@ import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{decideAccessRight, _} import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST.{JValue, _} import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.util
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable
import scala.util.matching.Regex import scala.util.matching.Regex
import java.util
import eu.dnetlib.doiboost.DoiBoostMappingUtil
case class CrossrefDT(doi: String, json:String, timestamp: Long) {} case class CrossrefDT(doi: String, json:String, timestamp: Long) {}

View File

@ -6,7 +6,7 @@ import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -17,12 +17,12 @@ object CrossrefDataset {
val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkMapDumpIntoOAF.getClass)
def to_item(input:String):CrossrefDT = { def to_item(input: String): CrossrefDT = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input) lazy val json: json4s.JValue = parse(input)
val ts:Long = (json \ "indexed" \ "timestamp").extract[Long] val ts: Long = (json \ "indexed" \ "timestamp").extract[Long]
val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String])
CrossrefDT(doi, input, ts) CrossrefDT(doi, input, ts)
} }
@ -30,7 +30,6 @@ object CrossrefDataset {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf() val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(CrossrefDataset.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_to_dataset_params.json")))
parser.parseArgument(args) parser.parseArgument(args)
@ -54,7 +53,7 @@ object CrossrefDataset {
return b return b
if(a.timestamp >b.timestamp) { if (a.timestamp > b.timestamp) {
return a return a
} }
b b
@ -66,7 +65,7 @@ object CrossrefDataset {
if (a == null) if (a == null)
return b return b
if(a.timestamp >b.timestamp) { if (a.timestamp > b.timestamp) {
return a return a
} }
b b
@ -79,20 +78,20 @@ object CrossrefDataset {
override def finish(reduction: CrossrefDT): CrossrefDT = reduction override def finish(reduction: CrossrefDT): CrossrefDT = reduction
} }
val workingPath:String = parser.get("workingPath") val workingPath: String = parser.get("workingPath")
val main_ds:Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT] val main_ds: Dataset[CrossrefDT] = spark.read.load(s"$workingPath/crossref_ds").as[CrossrefDT]
val update = val update =
spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text]) spark.createDataset(spark.sparkContext.sequenceFile(s"$workingPath/index_update", classOf[IntWritable], classOf[Text])
.map(i =>CrossrefImporter.decompressBlob(i._2.toString)) .map(i => CrossrefImporter.decompressBlob(i._2.toString))
.map(i =>to_item(i))) .map(i => to_item(i)))
main_ds.union(update).groupByKey(_.doi) main_ds.union(update).groupByKey(_.doi)
.agg(crossrefAggregator.toColumn) .agg(crossrefAggregator.toColumn)
.map(s=>s._2) .map(s => s._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated") .write.mode(SaveMode.Overwrite).save(s"$workingPath/crossref_ds_updated")
} }

View File

@ -2,17 +2,12 @@ package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.crossref.CrossrefDataset.to_item
import eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries.getClass
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST.JArray import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source import scala.io.Source
@ -24,11 +19,10 @@ object GenerateCrossrefDataset {
implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT] implicit val mrEncoder: Encoder[CrossrefDT] = Encoders.kryo[CrossrefDT]
def crossrefElement(meta: String): CrossrefDT = { def crossrefElement(meta: String): CrossrefDT = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(meta) lazy val json: json4s.JValue = parse(meta)
val doi:String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String]) val doi: String = DoiBoostMappingUtil.normalizeDoi((json \ "DOI").extract[String])
val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long] val timestamp: Long = (json \ "indexed" \ "timestamp").extract[Long]
CrossrefDT(doi, meta, timestamp) CrossrefDT(doi, meta, timestamp)
@ -51,14 +45,14 @@ object GenerateCrossrefDataset {
import spark.implicits._ import spark.implicits._
val tmp : RDD[String] = sc.textFile(sourcePath,6000) val tmp: RDD[String] = sc.textFile(sourcePath, 6000)
spark.createDataset(tmp) spark.createDataset(tmp)
.map(entry => crossrefElement(entry)) .map(entry => crossrefElement(entry))
.write.mode(SaveMode.Overwrite).save(targetPath) .write.mode(SaveMode.Overwrite).save(targetPath)
// .map(meta => crossrefElement(meta)) // .map(meta => crossrefElement(meta))
// .toDS.as[CrossrefDT] // .toDS.as[CrossrefDT]
// .write.mode(SaveMode.Overwrite).save(targetPath) // .write.mode(SaveMode.Overwrite).save(targetPath)
} }

View File

@ -4,10 +4,8 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf import eu.dnetlib.dhp.schema.oaf
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}

View File

@ -2,8 +2,8 @@ package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST.JArray import org.json4s.JsonAST.JArray
@ -17,9 +17,7 @@ object UnpackCrtossrefEntries {
val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass) val log: Logger = LoggerFactory.getLogger(UnpackCrtossrefEntries.getClass)
def extractDump(input: String): List[String] = {
def extractDump(input:String):List[String] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input) lazy val json: json4s.JValue = parse(input)
@ -30,7 +28,6 @@ object UnpackCrtossrefEntries {
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val conf = new SparkConf val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString) val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString)
@ -45,7 +42,7 @@ object UnpackCrtossrefEntries {
.getOrCreate() .getOrCreate()
val sc: SparkContext = spark.sparkContext val sc: SparkContext = spark.sparkContext
sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2)) sc.wholeTextFiles(sourcePath, 6000).flatMap(d => extractDump(d._2))
.saveAsTextFile(targetPath, classOf[GzipCodec]) .saveAsTextFile(targetPath, classOf[GzipCodec])

View File

@ -5,10 +5,10 @@ import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.{Instance, Journal, Publication, StructuredProperty}
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable import scala.collection.mutable

View File

@ -3,8 +3,8 @@ package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkImportMagIntoDataset { object SparkImportMagIntoDataset {
@ -24,13 +24,13 @@ object SparkImportMagIntoDataset {
"Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "Affiliations" -> Tuple2("mag/Affiliations.txt", Seq("AffiliationId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "GridId:string", "OfficialPage:string", "WikiPage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Iso3166Code:string", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")), "AuthorExtendedAttributes" -> Tuple2("mag/AuthorExtendedAttributes.txt", Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")),
"Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "Authors" -> Tuple2("mag/Authors.txt", Seq("AuthorId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "LastKnownAffiliationId:long?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")), "ConferenceInstances" -> Tuple2("mag/ConferenceInstances.txt", Seq("ConferenceInstanceId:long", "NormalizedName:string", "DisplayName:string", "ConferenceSeriesId:long", "Location:string", "OfficialUrl:string", "StartDate:DateTime?", "EndDate:DateTime?", "AbstractRegistrationDate:DateTime?", "SubmissionDeadlineDate:DateTime?", "NotificationDueDate:DateTime?", "FinalVersionDueDate:DateTime?", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "Latitude:float?", "Longitude:float?", "CreatedDate:DateTime")),
"ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "ConferenceSeries" -> Tuple2("mag/ConferenceSeries.txt", Seq("ConferenceSeriesId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")), "EntityRelatedEntities" -> Tuple2("advanced/EntityRelatedEntities.txt", Seq("EntityId:long", "EntityType:string", "RelatedEntityId:long", "RelatedEntityType:string", "RelatedType:int", "Score:float")),
"FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")), "FieldOfStudyChildren" -> Tuple2("advanced/FieldOfStudyChildren.txt", Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")),
"FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")), "FieldOfStudyExtendedAttributes" -> Tuple2("advanced/FieldOfStudyExtendedAttributes.txt", Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")),
"FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")), "FieldsOfStudy" -> Tuple2("advanced/FieldsOfStudy.txt", Seq("FieldOfStudyId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "MainType:string", "Level:int", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long" ,"CitationCount:long", "CreatedDate:DateTime")), "Journals" -> Tuple2("mag/Journals.txt", Seq("JournalId:long", "Rank:uint", "NormalizedName:string", "DisplayName:string", "Issn:string", "Publisher:string", "Webpage:string", "PaperCount:long", "PaperFamilyCount:long", "CitationCount:long", "CreatedDate:DateTime")),
"PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")), "PaperAbstractsInvertedIndex" -> Tuple2("nlp/PaperAbstractsInvertedIndex.txt.*", Seq("PaperId:long", "IndexedAbstract:string")),
"PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")), "PaperAuthorAffiliations" -> Tuple2("mag/PaperAuthorAffiliations.txt", Seq("PaperId:long", "AuthorId:long", "AffiliationId:long?", "AuthorSequenceNumber:uint", "OriginalAuthor:string", "OriginalAffiliation:string")),
"PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")), "PaperCitationContexts" -> Tuple2("nlp/PaperCitationContexts.txt", Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")),
@ -75,7 +75,6 @@ object SparkImportMagIntoDataset {
.master(parser.get("master")).getOrCreate() .master(parser.get("master")).getOrCreate()
stream.foreach { case (k, v) => stream.foreach { case (k, v) =>
val s: StructType = getSchema(k) val s: StructType = getSchema(k)
val df = spark.read val df = spark.read

View File

@ -5,22 +5,19 @@ import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions.{col, collect_list, struct}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
object SparkProcessMAG { object SparkProcessMAG {
def getDistinctResults (d:Dataset[MagPapers]):Dataset[MagPapers]={ def getDistinctResults(d: Dataset[MagPapers]): Dataset[MagPapers] = {
d.where(col("Doi").isNotNull) d.where(col("Doi").isNotNull)
.groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING) .groupByKey(mp => DoiBoostMappingUtil.normalizeDoi(mp.Doi))(Encoders.STRING)
.reduceGroups((p1:MagPapers,p2:MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1,p2)) .reduceGroups((p1: MagPapers, p2: MagPapers) => ConversionUtil.choiceLatestMagArtitcle(p1, p2))
.map(_._2)(Encoders.product[MagPapers]) .map(_._2)(Encoders.product[MagPapers])
.map(mp => { .map(mp => {
new MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi), MagPapers(mp.PaperId, mp.Rank, DoiBoostMappingUtil.normalizeDoi(mp.Doi),
mp.DocType, mp.PaperTitle, mp.OriginalTitle, mp.DocType, mp.PaperTitle, mp.OriginalTitle,
mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String, mp.BookTitle, mp.Year, mp.Date, mp.Publisher: String,
mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId, mp.JournalId, mp.ConferenceSeriesId, mp.ConferenceInstanceId,
@ -98,13 +95,13 @@ object SparkProcessMAG {
var magPubs: Dataset[(String, Publication)] = var magPubs: Dataset[(String, Publication)] =
spark.read.load(s"$workingPath/merge_step_2").as[Publication] spark.read.load(s"$workingPath/merge_step_2").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)] .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
val conference = spark.read.load(s"$sourcePath/ConferenceInstances") val conference = spark.read.load(s"$sourcePath/ConferenceInstances")
.select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate",$"EndDate" ) .select($"ConferenceInstanceId".as("ci"), $"DisplayName", $"Location", $"StartDate", $"EndDate")
val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci"))) val conferenceInstance = conference.joinWith(papers, papers("ConferenceInstanceId").equalTo(conference("ci")))
.select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate",$"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance] .select($"_1.ci", $"_1.DisplayName", $"_1.Location", $"_1.StartDate", $"_1.EndDate", $"_2.PaperId").as[MagConferenceInstance]
magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left") magPubs.joinWith(conferenceInstance, col("_1").equalTo(conferenceInstance("PaperId")), "left")
@ -122,7 +119,7 @@ object SparkProcessMAG {
magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left") magPubs.joinWith(paperAbstract, col("_1").equalTo(paperAbstract("PaperId")), "left")
.map(item => ConversionUtil.updatePubsWithDescription(item) .map(item => ConversionUtil.updatePubsWithDescription(item)
).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4") ).write.mode(SaveMode.Overwrite).save(s"$workingPath/merge_step_4")
logger.info("Phase 7) Enrich Publication with FieldOfStudy") logger.info("Phase 7) Enrich Publication with FieldOfStudy")
@ -148,11 +145,10 @@ object SparkProcessMAG {
spark.read.load(s"$workingPath/mag_publication").as[Publication] spark.read.load(s"$workingPath/mag_publication").as[Publication]
.filter(p => p.getId != null) .filter(p => p.getId != null)
.groupByKey(p => p.getId) .groupByKey(p => p.getId)
.reduceGroups((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b)) .reduceGroups((a: Publication, b: Publication) => ConversionUtil.mergePublication(a, b))
.map(_._2) .map(_._2)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication") .write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
} }
} }

View File

@ -4,17 +4,16 @@ import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication} import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
import eu.dnetlib.dhp.schema.orcid.{AuthorData, OrcidDOI}
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo} import eu.dnetlib.doiboost.DoiBoostMappingUtil.{createSP, generateDataInfo}
import org.apache.commons.lang.StringUtils import org.apache.commons.lang.StringUtils
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.JsonAST._ import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){} case class ORCIDItem(doi:String, authors:List[OrcidAuthor]){}

View File

@ -11,10 +11,10 @@ object SparkConvertORCIDToOAF {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = { def run(spark: SparkSession, workingPath: String, targetPath: String): Unit = {
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
import spark.implicits._ import spark.implicits._
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem] val dataset: Dataset[ORCIDItem] = spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
logger.info("Converting ORCID to OAF") logger.info("Converting ORCID to OAF")
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath) dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
@ -35,8 +35,8 @@ object SparkConvertORCIDToOAF {
val workingPath = parser.get("workingPath") val workingPath = parser.get("workingPath")
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
run(spark,workingPath, targetPath) run(spark, workingPath, targetPath)
} }
} }

View File

@ -1,48 +1,45 @@
package eu.dnetlib.doiboost.orcid package eu.dnetlib.doiboost.orcid
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.oa.merge.AuthorMerger
import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{col, collect_list}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkPreprocessORCID { object SparkPreprocessORCID {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass) val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
def fixORCIDItem(item :ORCIDItem):ORCIDItem = { def fixORCIDItem(item: ORCIDItem): ORCIDItem = {
ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList) ORCIDItem(item.doi, item.authors.groupBy(_.oid).map(_._2.head).toList)
} }
def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = { def run(spark: SparkSession, sourcePath: String, workingPath: String): Unit = {
import spark.implicits._ import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s)) val inputRDD: RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s != null).filter(s => ORCIDToOAF.authorValid(s))
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author") spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null) val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s != null)
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works") spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor] val authors: Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork] val works: Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
works.joinWith(authors, authors("oid").equalTo(works("oid"))) works.joinWith(authors, authors("oid").equalTo(works("oid")))
.map(i =>{ .map(i => {
val doi = i._1.doi val doi = i._1.doi
val author = i._2 val author = i._2
(doi, author) (doi, author)
}).groupBy(col("_1").alias("doi")) }).groupBy(col("_1").alias("doi"))
.agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem] .agg(collect_list(col("_2")).alias("authors")).as[ORCIDItem]
.map(s => fixORCIDItem(s)) .map(s => fixORCIDItem(s))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor") .write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
@ -67,4 +64,4 @@ object SparkPreprocessORCID {
} }
} }

View File

@ -1,16 +1,14 @@
package eu.dnetlib.doiboost.uw package eu.dnetlib.doiboost.uw
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF import eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
object SparkMapUnpayWallToOAF { object SparkMapUnpayWallToOAF {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
@ -32,11 +30,11 @@ object SparkMapUnpayWallToOAF {
val sourcePath = parser.get("sourcePath") val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath") val targetPath = parser.get("targetPath")
val inputRDD:RDD[String] = spark.sparkContext.textFile(s"$sourcePath") val inputRDD: RDD[String] = spark.sparkContext.textFile(s"$sourcePath")
logger.info("Converting UnpayWall to OAF") logger.info("Converting UnpayWall to OAF")
val d:Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication] val d: Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p => p != null)).as[Publication]
d.write.mode(SaveMode.Overwrite).save(targetPath) d.write.mode(SaveMode.Overwrite).save(targetPath)
} }

View File

@ -4,14 +4,13 @@ import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory
import eu.dnetlib.dhp.schema.oaf.{AccessRight, Instance, OpenAccessRoute, Publication} import eu.dnetlib.dhp.schema.oaf.{AccessRight, Instance, OpenAccessRoute, Publication}
import eu.dnetlib.doiboost.DoiBoostMappingUtil import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import eu.dnetlib.doiboost.DoiBoostMappingUtil._
import eu.dnetlib.doiboost.uw.UnpayWallToOAF.get_unpaywall_color

View File

@ -1,70 +0,0 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset}
import eu.dnetlib.doiboost.{DoiBoostMappingUtil, HostedByItemType}
import eu.dnetlib.doiboost.SparkGenerateDoiBoost.getClass
import eu.dnetlib.doiboost.mag.ConversionUtil
import eu.dnetlib.doiboost.orcid.ORCIDElement
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
import scala.io.Source
class DoiBoostHostedByMapTest {
// @Test
// def testMerge():Unit = {
// val conf: SparkConf = new SparkConf()
// val spark: SparkSession =
// SparkSession
// .builder()
// .config(conf)
// .appName(getClass.getSimpleName)
// .master("local[*]").getOrCreate()
//
//
//
// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
// implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
//
//
// import spark.implicits._
// val dataset:RDD[String]= spark.sparkContext.textFile("/home/sandro/Downloads/hbMap.gz")
//
//
// val hbMap:Dataset[(String, HostedByItemType)] =spark.createDataset(dataset.map(DoiBoostMappingUtil.toHostedByItem))
//
//
// hbMap.show()
//
//
//
//
//
//
//
//
//
//
// }
@Test
def idDSGeneration():Unit = {
val s ="doajarticles::0066-782X"
println(DoiBoostMappingUtil.generateDSId(s))
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import org.junit.jupiter.api.Test
class DoiBoostHostedByMapTest {
@Test
def idDSGeneration():Unit = {
val s ="doajarticles::0066-782X"
println(DoiBoostMappingUtil.generateDSId(s))
}
}

View File

@ -1,7 +1,8 @@
package eu.dnetlib.doiboost.crossref package eu.dnetlib.dhp.doiboost.crossref
import eu.dnetlib.dhp.schema.oaf._ import eu.dnetlib.dhp.schema.oaf._
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.doiboost.crossref.Crossref2Oaf
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig} import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -21,9 +22,9 @@ class CrossrefMappingTest {
@Test @Test
def testFunderRelationshipsMapping(): Unit = { def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article_funder_template.json")).mkString
val funder_doi = Source.fromInputStream(getClass.getResourceAsStream("funder_doi")).mkString val funder_doi = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")).mkString
val funder_name = Source.fromInputStream(getClass.getResourceAsStream("funder_doi")).mkString val funder_name = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/funder_doi")).mkString
for (line <- funder_doi.lines) { for (line <- funder_doi.lines) {
@ -72,7 +73,7 @@ class CrossrefMappingTest {
@Test @Test
def testOrcidID() :Unit = { def testOrcidID() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/orcid_data.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -93,7 +94,7 @@ class CrossrefMappingTest {
@Test @Test
def testEmptyTitle() :Unit = { def testEmptyTitle() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/empty_title.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -115,7 +116,7 @@ class CrossrefMappingTest {
@Test @Test
def testPeerReviewed(): Unit = { def testPeerReviewed(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("prwTest.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/prwTest.json")).mkString
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
assertNotNull(json) assertNotNull(json)
@ -156,7 +157,7 @@ class CrossrefMappingTest {
@Test @Test
def testJournalRelation(): Unit = { def testJournalRelation(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("awardTest.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/awardTest.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty) assertFalse(json.isEmpty)
@ -177,7 +178,7 @@ class CrossrefMappingTest {
@Test @Test
def testConvertBookFromCrossRef2Oaf(): Unit = { def testConvertBookFromCrossRef2Oaf(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("book.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/book.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
@ -233,7 +234,7 @@ class CrossrefMappingTest {
@Test @Test
def testConvertPreprintFromCrossRef2Oaf(): Unit = { def testConvertPreprintFromCrossRef2Oaf(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("preprint.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/preprint.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
@ -291,7 +292,7 @@ class CrossrefMappingTest {
@Test @Test
def testConvertDatasetFromCrossRef2Oaf(): Unit = { def testConvertDatasetFromCrossRef2Oaf(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("dataset.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/dataset.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
@ -332,7 +333,7 @@ class CrossrefMappingTest {
@Test @Test
def testConvertArticleFromCrossRef2Oaf(): Unit = { def testConvertArticleFromCrossRef2Oaf(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("article.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
@ -400,7 +401,7 @@ class CrossrefMappingTest {
@Test @Test
def testSetDateOfAcceptanceCrossRef2Oaf(): Unit = { def testSetDateOfAcceptanceCrossRef2Oaf(): Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("dump_file.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/dump_file.json")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty); assertFalse(json.isEmpty);
@ -415,55 +416,12 @@ class CrossrefMappingTest {
assert(items.size == 1) assert(items.size == 1)
val result: Result = items.head.asInstanceOf[Publication] val result: Result = items.head.asInstanceOf[Publication]
assertNotNull(result) assertNotNull(result)
logger.info(mapper.writeValueAsString(result)); logger.info(mapper.writeValueAsString(result));
// assertNotNull(result.getDataInfo, "Datainfo test not null Failed");
// assertNotNull(
// result.getDataInfo.getProvenanceaction,
// "DataInfo/Provenance test not null Failed");
// assertFalse(
// result.getDataInfo.getProvenanceaction.getClassid.isEmpty,
// "DataInfo/Provenance/classId test not null Failed");
// assertFalse(
// result.getDataInfo.getProvenanceaction.getClassname.isEmpty,
// "DataInfo/Provenance/className test not null Failed");
// assertFalse(
// result.getDataInfo.getProvenanceaction.getSchemeid.isEmpty,
// "DataInfo/Provenance/SchemeId test not null Failed");
// assertFalse(
// result.getDataInfo.getProvenanceaction.getSchemename.isEmpty,
// "DataInfo/Provenance/SchemeName test not null Failed");
//
// assertNotNull(result.getCollectedfrom, "CollectedFrom test not null Failed");
// assertFalse(result.getCollectedfrom.isEmpty);
//
// val collectedFromList = result.getCollectedfrom.asScala
// assert(collectedFromList.exists(c => c.getKey.equalsIgnoreCase("10|openaire____::081b82f96300b6a6e3d282bad31cb6e2")), "Wrong collected from assertion")
//
// assert(collectedFromList.exists(c => c.getValue.equalsIgnoreCase("crossref")), "Wrong collected from assertion")
//
//
// val relevantDates = result.getRelevantdate.asScala
//
// assert(relevantDates.exists(d => d.getQualifier.getClassid.equalsIgnoreCase("created")), "Missing relevant date of type created")
//
// val rels = resultList.filter(p => p.isInstanceOf[Relation]).asInstanceOf[List[Relation]]
// assertFalse(rels.isEmpty)
// rels.foreach(relation => {
// assertNotNull(relation)
// assertFalse(relation.getSource.isEmpty)
// assertFalse(relation.getTarget.isEmpty)
// assertFalse(relation.getRelClass.isEmpty)
// assertFalse(relation.getRelType.isEmpty)
// assertFalse(relation.getSubRelType.isEmpty)
//
// })
} }
@Test @Test
def testNormalizeDOI(): Unit = { def testNormalizeDOI(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article_funder_template.json")).mkString
val line :String = "\"funder\": [{\"name\": \"Wellcome Trust Masters Fellowship\",\"award\": [\"090633\"]}]," val line :String = "\"funder\": [{\"name\": \"Wellcome Trust Masters Fellowship\",\"award\": [\"090633\"]}],"
val json = template.replace("%s", line) val json = template.replace("%s", line)
val resultList: List[Oaf] = Crossref2Oaf.convert(json) val resultList: List[Oaf] = Crossref2Oaf.convert(json)
@ -479,7 +437,7 @@ class CrossrefMappingTest {
@Test @Test
def testNormalizeDOI2(): Unit = { def testNormalizeDOI2(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article.json")).mkString val template = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/article.json")).mkString
val resultList: List[Oaf] = Crossref2Oaf.convert(template) val resultList: List[Oaf] = Crossref2Oaf.convert(template)
assertTrue(resultList.nonEmpty) assertTrue(resultList.nonEmpty)
@ -494,7 +452,7 @@ class CrossrefMappingTest {
@Test @Test
def testLicenseVorClosed() :Unit = { def testLicenseVorClosed() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_vor.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_vor.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -521,7 +479,7 @@ class CrossrefMappingTest {
@Test @Test
def testLicenseOpen() :Unit = { def testLicenseOpen() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_open.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_open.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -544,7 +502,7 @@ class CrossrefMappingTest {
@Test @Test
def testLicenseEmbargoOpen() :Unit = { def testLicenseEmbargoOpen() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_open.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo_open.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -567,7 +525,7 @@ class CrossrefMappingTest {
@Test @Test
def testLicenseEmbargo() :Unit = { def testLicenseEmbargo() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -591,7 +549,7 @@ class CrossrefMappingTest {
@Test @Test
def testLicenseEmbargoDateTime() :Unit = { def testLicenseEmbargoDateTime() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("publication_license_embargo_datetime.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/publication_license_embargo_datetime.json")).mkString
assertNotNull(json) assertNotNull(json)
@ -614,7 +572,7 @@ class CrossrefMappingTest {
@Test @Test
def testMultipleURLs() :Unit = { def testMultipleURLs() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("multiple_urls.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/crossref/multiple_urls.json")).mkString
assertNotNull(json) assertNotNull(json)

View File

@ -1,11 +1,12 @@
package eu.dnetlib.doiboost.mag package eu.dnetlib.dhp.doiboost.mag
import eu.dnetlib.doiboost.mag.{ConversionUtil, MagPapers, SparkProcessMAG}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.{Dataset, SparkSession}
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
import org.json4s.DefaultFormats
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.sql.Timestamp import java.sql.Timestamp
@ -47,7 +48,7 @@ class MAGMappingTest {
@Test @Test
def buildInvertedIndexTest(): Unit = { def buildInvertedIndexTest(): Unit = {
val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString val json_input = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/mag/invertedIndex.json")).mkString
val description = ConversionUtil.convertInvertedIndexString(json_input) val description = ConversionUtil.convertInvertedIndexString(json_input)
assertNotNull(description) assertNotNull(description)
assertTrue(description.nonEmpty) assertTrue(description.nonEmpty)
@ -71,7 +72,7 @@ class MAGMappingTest {
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.config(conf) .config(conf)
.getOrCreate() .getOrCreate()
val path = getClass.getResource("magPapers.json").getPath val path = getClass.getResource("/eu/dnetlib/doiboost/mag/magPapers.json").getPath
import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders
val schema = Encoders.product[MagPapers].schema val schema = Encoders.product[MagPapers].schema
@ -101,7 +102,7 @@ class MAGMappingTest {
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.config(conf) .config(conf)
.getOrCreate() .getOrCreate()
val path = getClass.getResource("duplicatedMagPapers.json").getPath val path = getClass.getResource("/eu/dnetlib/doiboost/mag/duplicatedMagPapers.json").getPath
import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders
val schema = Encoders.product[MagPapers].schema val schema = Encoders.product[MagPapers].schema

View File

@ -1,7 +1,8 @@
package eu.dnetlib.doiboost.orcid package eu.dnetlib.dhp.doiboost.orcid
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.oaf.Publication import eu.dnetlib.dhp.schema.oaf.Publication
import eu.dnetlib.doiboost.orcid._
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Assertions._
@ -10,9 +11,8 @@ import org.junit.jupiter.api.io.TempDir
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
import java.nio.file.Path import java.nio.file.Path
import scala.io.Source
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.io.Source
class MappingORCIDToOAFTest { class MappingORCIDToOAFTest {
val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass) val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass)
@ -20,7 +20,7 @@ class MappingORCIDToOAFTest {
@Test @Test
def testExtractData():Unit ={ def testExtractData():Unit ={
val json = Source.fromInputStream(getClass.getResourceAsStream("dataOutput")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/orcid/dataOutput")).mkString
assertNotNull(json) assertNotNull(json)
assertFalse(json.isEmpty) assertFalse(json.isEmpty)
json.lines.foreach(s => { json.lines.foreach(s => {

View File

@ -1,13 +1,13 @@
package eu.dnetlib.doiboost.uw package eu.dnetlib.dhp.doiboost.uw
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute
import eu.dnetlib.doiboost.uw.UnpayWallToOAF
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.slf4j.{Logger, LoggerFactory}
import scala.io.Source import scala.io.Source
import org.junit.jupiter.api.Assertions._
import org.slf4j.{Logger, LoggerFactory}
class UnpayWallMappingTest { class UnpayWallMappingTest {
@ -18,7 +18,7 @@ class UnpayWallMappingTest {
@Test @Test
def testMappingToOAF():Unit ={ def testMappingToOAF():Unit ={
val Ilist = Source.fromInputStream(getClass.getResourceAsStream("input.json")).mkString val Ilist = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/doiboost/uw/input.json")).mkString
var i:Int = 0 var i:Int = 0
for (line <-Ilist.lines) { for (line <-Ilist.lines) {