forked from antonis.lempesis/dnet-hadoop
improvement DOIBoost
This commit is contained in:
parent
13815d5d13
commit
7ac1ba2e35
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.doiboost
|
||||
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
|
||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Organization, Publication, Qualifier, Relation, Result, StructuredProperty}
|
||||
import eu.dnetlib.dhp.utils.DHPUtils
|
||||
import org.apache.commons.lang3.StringUtils
|
||||
import org.codehaus.jackson.map.ObjectMapper
|
||||
|
@ -14,11 +14,21 @@ import scala.collection.JavaConverters._
|
|||
import scala.io.Source
|
||||
|
||||
|
||||
case class HostedByItemType(id: String, officialName: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
|
||||
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
|
||||
|
||||
case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:String){}
|
||||
case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:Option[String], OfficialPage:Option[String], DisplayName:Option[String]){}
|
||||
|
||||
object DoiBoostMappingUtil {
|
||||
def getUnknownCountry(): Qualifier = {
|
||||
createQualifier("UNKNOWN","UNKNOWN","dnet:countries","dnet:countries")
|
||||
}
|
||||
|
||||
|
||||
|
||||
def generateMAGAffiliationId(affId: String): String = {
|
||||
s"20|microsoft___$SEPARATOR${DHPUtils.md5(affId)}"
|
||||
}
|
||||
|
||||
|
||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||
|
||||
|
@ -52,6 +62,11 @@ object DoiBoostMappingUtil {
|
|||
a.setClazz(classOf[Publication])
|
||||
a.setPayload(publication)
|
||||
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case organization: Organization =>
|
||||
val a: AtomicAction[Organization] = new AtomicAction[Organization]
|
||||
a.setClazz(classOf[Organization])
|
||||
a.setPayload(organization)
|
||||
(organization.getClass.getCanonicalName, mapper.writeValueAsString(a))
|
||||
case relation: Relation =>
|
||||
val a: AtomicAction[Relation] = new AtomicAction[Relation]
|
||||
a.setClazz(classOf[Relation])
|
||||
|
@ -64,27 +79,33 @@ object DoiBoostMappingUtil {
|
|||
}
|
||||
|
||||
|
||||
def retrieveHostedByMap(): Map[String, HostedByItemType] = {
|
||||
def toHostedByItem(input:String): (String, HostedByItemType) = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
val jsonMap = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/hbMap.json")).mkString
|
||||
lazy val json: json4s.JValue = parse(jsonMap)
|
||||
json.extract[Map[String, HostedByItemType]]
|
||||
|
||||
lazy val json: json4s.JValue = parse(input)
|
||||
val c :Map[String,HostedByItemType] = json.extract[Map[String, HostedByItemType]]
|
||||
(c.keys.head, c.values.head)
|
||||
}
|
||||
|
||||
def retrieveHostedByItem(issn: String, eissn: String, lissn: String, hostedByMap: Map[String, HostedByItemType]): HostedByItemType = {
|
||||
if (issn != null && issn.nonEmpty && hostedByMap.contains(issn))
|
||||
return hostedByMap(issn)
|
||||
|
||||
if (eissn != null && eissn.nonEmpty && hostedByMap.contains(eissn))
|
||||
return hostedByMap(eissn)
|
||||
|
||||
if (lissn != null && lissn.nonEmpty && hostedByMap.contains(lissn))
|
||||
return hostedByMap(lissn)
|
||||
|
||||
null
|
||||
def toISSNPair(publication: Publication) : (String, Publication) = {
|
||||
val issn = if (publication.getJournal == null) null else publication.getJournal.getIssnPrinted
|
||||
val eissn =if (publication.getJournal == null) null else publication.getJournal.getIssnOnline
|
||||
val lissn =if (publication.getJournal == null) null else publication.getJournal.getIssnLinking
|
||||
|
||||
if (issn!= null && issn.nonEmpty)
|
||||
(issn, publication)
|
||||
else if(eissn!= null && eissn.nonEmpty)
|
||||
(eissn, publication)
|
||||
else if(lissn!= null && lissn.nonEmpty)
|
||||
(lissn, publication)
|
||||
else
|
||||
(publication.getId, publication)
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def generateGridAffiliationId(gridId:String) :String = {
|
||||
s"20|grid________::${DHPUtils.md5(gridId.toLowerCase().trim())}"
|
||||
}
|
||||
|
@ -96,18 +117,36 @@ object DoiBoostMappingUtil {
|
|||
result.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype))
|
||||
}
|
||||
result.getInstance().asScala.foreach(i => {
|
||||
val hb = new KeyValue
|
||||
hb.setValue("Unknown Repository")
|
||||
hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c")
|
||||
i.setHostedby(hb)
|
||||
i.setHostedby(getUbknownHostedBy())
|
||||
})
|
||||
result
|
||||
}
|
||||
|
||||
def fixPublication(publication: Publication, hostedByMap: Map[String, HostedByItemType]): Publication = {
|
||||
val issn = if (publication.getJournal == null) null else publication.getJournal.getIssnPrinted
|
||||
val eissn =if (publication.getJournal == null) null else publication.getJournal.getIssnOnline
|
||||
val lissn =if (publication.getJournal == null) null else publication.getJournal.getIssnLinking
|
||||
def getUbknownHostedBy():KeyValue = {
|
||||
val hb = new KeyValue
|
||||
hb.setValue("Unknown Repository")
|
||||
hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c")
|
||||
hb
|
||||
|
||||
}
|
||||
|
||||
|
||||
def getOpenAccessQualifier():Qualifier = {
|
||||
createQualifier("OPEN","Open Access","dnet:access_modes", "dnet:access_modes")
|
||||
|
||||
}
|
||||
|
||||
def getRestrictedQualifier():Qualifier = {
|
||||
createQualifier("RESTRICTED","Restricted","dnet:access_modes", "dnet:access_modes")
|
||||
|
||||
}
|
||||
|
||||
def fixPublication(input:((String,Publication), (String,HostedByItemType))): Publication = {
|
||||
|
||||
val publication = input._1._2
|
||||
|
||||
val item = if (input._2 != null) input._2._2 else null
|
||||
|
||||
|
||||
val instanceType = publication.getInstance().asScala.find(i => i.getInstancetype != null && i.getInstancetype.getClassid.nonEmpty)
|
||||
|
||||
|
@ -115,15 +154,15 @@ object DoiBoostMappingUtil {
|
|||
publication.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype))
|
||||
}
|
||||
|
||||
val item = retrieveHostedByItem(issn, eissn, lissn, hostedByMap)
|
||||
|
||||
publication.getInstance().asScala.foreach(i => {
|
||||
val hb = new KeyValue
|
||||
if (item != null) {
|
||||
hb.setValue(item.officialName)
|
||||
hb.setValue(item.officialname)
|
||||
hb.setKey(generateDSId(item.id))
|
||||
if (item.openAccess)
|
||||
i.setAccessright(createQualifier("OPEN", "dnet:access_modes"))
|
||||
publication.setBestaccessright(createQualifier("OPEN", "dnet:access_modes"))
|
||||
i.setAccessright(getOpenAccessQualifier())
|
||||
publication.setBestaccessright(getOpenAccessQualifier())
|
||||
}
|
||||
else {
|
||||
hb.setValue("Unknown Repository")
|
||||
|
@ -135,10 +174,10 @@ object DoiBoostMappingUtil {
|
|||
val ar = publication.getInstance().asScala.filter(i => i.getInstancetype != null && i.getAccessright!= null && i.getAccessright.getClassid!= null).map(f=> f.getAccessright.getClassid)
|
||||
if (ar.nonEmpty) {
|
||||
if(ar.contains("OPEN")){
|
||||
publication.setBestaccessright(createQualifier("OPEN", "dnet:access_modes"))
|
||||
publication.setBestaccessright(getOpenAccessQualifier())
|
||||
}
|
||||
else {
|
||||
publication.setBestaccessright(createQualifier(ar.head, "dnet:access_modes"))
|
||||
publication.setBestaccessright(getRestrictedQualifier())
|
||||
}
|
||||
}
|
||||
publication
|
||||
|
@ -298,6 +337,8 @@ object DoiBoostMappingUtil {
|
|||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def createMAGCollectedFrom(): KeyValue = {
|
||||
|
||||
val cf = new KeyValue
|
||||
|
|
|
@ -2,10 +2,13 @@ package eu.dnetlib.doiboost
|
|||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
||||
import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset}
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.hadoop.io.Text
|
||||
import org.apache.hadoop.io.compress.GzipCodec
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{ Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object SparkGenerateDOIBoostActionSet {
|
||||
|
@ -23,8 +26,10 @@ object SparkGenerateDOIBoostActionSet {
|
|||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
|
||||
implicit val mapEncoderAS: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
|
||||
|
||||
implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]]
|
||||
|
||||
|
@ -32,17 +37,23 @@ object SparkGenerateDOIBoostActionSet {
|
|||
val dbDatasetPath = parser.get("dbDatasetPath")
|
||||
val crossRefRelation = parser.get("crossRefRelation")
|
||||
val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
|
||||
val dbOrganizationPath = parser.get("dbOrganizationPath")
|
||||
val workingDirPath = parser.get("targetPath")
|
||||
|
||||
spark.read.load(dbDatasetPath).as[OafDataset]
|
||||
.map(d =>DoiBoostMappingUtil.fixResult(d))
|
||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet")
|
||||
|
||||
spark.read.load(dbPublicationPath).as[Publication]
|
||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||
|
||||
spark.read.load(dbOrganizationPath).as[Organization]
|
||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||
|
||||
|
||||
spark.read.load(crossRefRelation).as[Relation]
|
||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||
|
@ -52,15 +63,9 @@ object SparkGenerateDOIBoostActionSet {
|
|||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||
|
||||
|
||||
// implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
|
||||
//
|
||||
// val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
||||
//
|
||||
//
|
||||
//
|
||||
// d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||
|
||||
val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
||||
|
||||
d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.doiboost
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset, Organization}
|
||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.spark.SparkConf
|
||||
|
@ -26,22 +26,13 @@ object SparkGenerateDoiBoost {
|
|||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
import spark.implicits._
|
||||
val crossrefPublicationPath = parser.get("crossrefPublicationPath")
|
||||
val crossrefDatasetPath = parser.get("crossrefDatasetPath")
|
||||
val uwPublicationPath = parser.get("uwPublicationPath")
|
||||
val magPublicationPath = parser.get("magPublicationPath")
|
||||
val orcidPublicationPath = parser.get("orcidPublicationPath")
|
||||
|
||||
val hostedByMapPath = parser.get("hostedByMapPath")
|
||||
val workingDirPath = parser.get("workingDirPath")
|
||||
|
||||
|
||||
// logger.info("Phase 1) repartition and move all the dataset in a same working folder")
|
||||
// spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
|
||||
// spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
|
||||
// spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
|
||||
// spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
|
||||
// spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
|
||||
|
||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization]
|
||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
|
||||
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
|
||||
|
@ -75,23 +66,26 @@ object SparkGenerateDoiBoost {
|
|||
sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication")
|
||||
|
||||
|
||||
val doiBoostPublication: Dataset[Publication] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication]
|
||||
val doiBoostPublication: Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder)
|
||||
|
||||
val map = DoiBoostMappingUtil.retrieveHostedByMap()
|
||||
val hostedByDataset : Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem))
|
||||
|
||||
doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(p => DoiBoostMappingUtil.fixPublication(p, map)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||
|
||||
doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left")
|
||||
.map(DoiBoostMappingUtil.fixPublication)
|
||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
|
||||
|
||||
val affiliationPath = parser.get("affiliationPath")
|
||||
val paperAffiliationPath = parser.get("paperAffiliationPath")
|
||||
|
||||
val affiliation = spark.read.load(affiliationPath).where(col("GridId").isNotNull).select(col("AffiliationId"), col("GridId"))
|
||||
val affiliation = spark.read.load(affiliationPath).select(col("AffiliationId"), col("GridId"), col("OfficialPage"), col("DisplayName"))
|
||||
|
||||
val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId"))
|
||||
|
||||
|
||||
val a:Dataset[DoiBoostAffiliation] = paperAffiliation
|
||||
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))).select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId")).as[DoiBoostAffiliation]
|
||||
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId")))
|
||||
.select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation]
|
||||
|
||||
|
||||
|
||||
|
@ -102,24 +96,45 @@ object SparkGenerateDoiBoost {
|
|||
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
|
||||
val pub:Publication = item._1._2
|
||||
val affiliation = item._2
|
||||
val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)
|
||||
val r:Relation = new Relation
|
||||
r.setSource(pub.getId)
|
||||
r.setTarget(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
|
||||
r.setTarget(affId)
|
||||
r.setRelType("resultOrganization")
|
||||
r.setRelClass("hasAuthorInstitution")
|
||||
r.setSubRelType("affiliation")
|
||||
r.setDataInfo(pub.getDataInfo)
|
||||
r.setCollectedfrom(pub.getCollectedfrom)
|
||||
r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
||||
val r1:Relation = new Relation
|
||||
r1.setTarget(pub.getId)
|
||||
r1.setSource(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
|
||||
r1.setSource(affId)
|
||||
r1.setRelType("resultOrganization")
|
||||
r1.setRelClass("isAuthorInstitutionOf")
|
||||
r1.setSubRelType("affiliation")
|
||||
r1.setDataInfo(pub.getDataInfo)
|
||||
r1.setCollectedfrom(pub.getCollectedfrom)
|
||||
r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
||||
List(r, r1)
|
||||
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
|
||||
}
|
||||
|
||||
|
||||
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => {
|
||||
val affiliation = item._2
|
||||
if (affiliation.GridId.isEmpty) {
|
||||
val o = new Organization
|
||||
o.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava)
|
||||
o.setDataInfo(DoiBoostMappingUtil.generateDataInfo())
|
||||
o.setId(DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString))
|
||||
o.setOriginalId(List(affiliation.AffiliationId.toString).asJava)
|
||||
if (affiliation.DisplayName.nonEmpty)
|
||||
o.setLegalname(DoiBoostMappingUtil.asField(affiliation.DisplayName.get))
|
||||
if (affiliation.OfficialPage.isDefined)
|
||||
o.setWebsiteurl(DoiBoostMappingUtil.asField(affiliation.OfficialPage.get))
|
||||
o.setCountry(DoiBoostMappingUtil.getUnknownCountry())
|
||||
o
|
||||
}
|
||||
else
|
||||
null
|
||||
}).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -106,7 +106,9 @@ case object Crossref2Oaf {
|
|||
|
||||
// Publisher ( Name of work's publisher mapped into Result/Publisher)
|
||||
val publisher = (json \ "publisher").extractOrElse[String](null)
|
||||
result.setPublisher(asField(publisher))
|
||||
if (publisher!= null && publisher.nonEmpty)
|
||||
result.setPublisher(asField(publisher))
|
||||
|
||||
|
||||
// TITLE
|
||||
val mainTitles = for {JString(title) <- json \ "title" if title.nonEmpty} yield createSP(title, "main title", "dnet:dataCite_title")
|
||||
|
@ -120,7 +122,7 @@ case object Crossref2Oaf {
|
|||
result.setDescription(descriptionList.asJava)
|
||||
|
||||
// Source
|
||||
val sourceList = for {JString(source) <- json \ "source" if source.nonEmpty} yield asField(source)
|
||||
val sourceList = for {JString(source) <- json \ "source" if source!= null && source.nonEmpty} yield asField(source)
|
||||
result.setSource(sourceList.asJava)
|
||||
|
||||
//RELEVANT DATE Mapping
|
||||
|
@ -168,7 +170,7 @@ case object Crossref2Oaf {
|
|||
instance.setRefereed(asField("peerReviewed"))
|
||||
|
||||
|
||||
instance.setAccessright(createQualifier("RESTRICTED", "dnet:access_modes"))
|
||||
instance.setAccessright(getRestrictedQualifier())
|
||||
result.setInstance(List(instance).asJava)
|
||||
instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource"))
|
||||
result.setResourcetype(createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource"))
|
||||
|
|
|
@ -115,7 +115,7 @@ case object ConversionUtil {
|
|||
j.setName(ci.DisplayName.get)
|
||||
if (ci.StartDate.isDefined && ci.EndDate.isDefined)
|
||||
{
|
||||
j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}")
|
||||
j.setConferencedate(s"${ci.StartDate.get.toString.substring(0,10)} - ${ci.EndDate.get.toString.substring(0,10)}")
|
||||
}
|
||||
|
||||
publication.setJournal(j)
|
||||
|
@ -214,7 +214,7 @@ case object ConversionUtil {
|
|||
|
||||
|
||||
if (paper.Date != null && paper.Date.isDefined) {
|
||||
pub.setDateofacceptance(asField(paper.Date.get.toString))
|
||||
pub.setDateofacceptance(asField(paper.Date.get.toString.substring(0,10)))
|
||||
}
|
||||
pub.setPublisher(asField(paper.Publisher))
|
||||
|
||||
|
@ -280,7 +280,7 @@ case object ConversionUtil {
|
|||
|
||||
|
||||
if (paper.Date != null) {
|
||||
pub.setDateofacceptance(asField(paper.Date.toString))
|
||||
pub.setDateofacceptance(asField(paper.Date.toString.substring(0,10)))
|
||||
}
|
||||
|
||||
pub.setAuthor(authorsOAF.asJava)
|
||||
|
|
|
@ -27,7 +27,7 @@ object SparkMapUnpayWallToOAF {
|
|||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication])
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
|
||||
|
||||
val sourcePath = parser.get("sourcePath")
|
||||
|
@ -35,7 +35,8 @@ object SparkMapUnpayWallToOAF {
|
|||
val inputRDD:RDD[String] = spark.sparkContext.textFile(s"$sourcePath")
|
||||
|
||||
logger.info("Converting UnpayWall to OAF")
|
||||
val d:Dataset[Publication] = spark.createDataset(inputRDD.repartition(1000).map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication]
|
||||
|
||||
val d:Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication]
|
||||
d.write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ object UnpayWallToOAF {
|
|||
val i :Instance= new Instance()
|
||||
|
||||
i.setCollectedfrom(createUnpayWallCollectedFrom())
|
||||
i.setAccessright(createQualifier("OPEN", "dnet:access_modes"))
|
||||
i.setAccessright(getOpenAccessQualifier())
|
||||
i.setUrl(List(oaLocation.url.get).asJava)
|
||||
|
||||
if (oaLocation.license.isDefined)
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
[
|
||||
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
|
||||
{"paramName": "dp", "paramLongName":"dbPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true},
|
||||
{"paramName": "dd", "paramLongName":"dbDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true},
|
||||
{"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
|
||||
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
|
||||
{"paramName": "dp", "paramLongName":"dbPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true},
|
||||
{"paramName": "dd", "paramLongName":"dbDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true},
|
||||
{"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
|
||||
{"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
||||
{"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||
{"paramName": "do", "paramLongName":"dbOrganizationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
||||
{"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||
]
|
||||
|
|
|
@ -1,10 +1,6 @@
|
|||
[
|
||||
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
|
||||
{"paramName": "cp", "paramLongName":"crossrefPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true},
|
||||
{"paramName": "cd", "paramLongName":"crossrefDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true},
|
||||
{"paramName": "up", "paramLongName":"uwPublicationPath", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
|
||||
{"paramName": "mp", "paramLongName":"magPublicationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
||||
{"paramName": "op", "paramLongName":"orcidPublicationPath", "paramDescription": "the ORCID Publication Path", "paramRequired": true},
|
||||
{"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true},
|
||||
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
|
||||
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
|
||||
{"paramName": "w", "paramLongName":"workingDirPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||
|
|
|
@ -1,37 +1,17 @@
|
|||
<workflow-app name="Create DOIBoostActionSet" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>crossrefPublicationPath</name>
|
||||
<description>the Crossref Publication Path</description>
|
||||
<name>hostedByMapPath</name>
|
||||
<description>the Hosted By Map Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>crossrefDatasetPath</name>
|
||||
<description>the Crossref Dataset Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>uwPublicationPath</name>
|
||||
<description>the UnpayWall Publication Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>magPublicationPath</name>
|
||||
<description>the MAG Publication Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>orcidPublicationPath</name>
|
||||
<description>the ORCID Publication Path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>affiliationPath</name>
|
||||
<description>the Affliation Path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>paperAffiliationPath</name>
|
||||
<description>the paperAffiliation Path</description>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>workingDirPath</name>
|
||||
<description>the Working Path</description>
|
||||
|
@ -52,7 +32,7 @@
|
|||
|
||||
|
||||
|
||||
<start to="CreateDOIBoost"/>
|
||||
<start to="GenerateActionSet"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
|
@ -82,17 +62,13 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--crossrefPublicationPath</arg><arg>${crossrefPublicationPath}</arg>
|
||||
<arg>--crossrefDatasetPath</arg><arg>${crossrefDatasetPath}</arg>
|
||||
<arg>--uwPublicationPath</arg><arg>${uwPublicationPath}</arg>
|
||||
<arg>--magPublicationPath</arg><arg>${magPublicationPath}</arg>
|
||||
<arg>--orcidPublicationPath</arg><arg>${orcidPublicationPath}</arg>
|
||||
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
||||
<arg>--affiliationPath</arg><arg>${affiliationPath}</arg>
|
||||
<arg>--paperAffiliationPath</arg><arg>${paperAffiliationPath}</arg>
|
||||
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<ok to="GenerateActionSet"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
@ -115,6 +91,7 @@
|
|||
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
|
||||
<arg>--crossRefRelation</arg><arg>/data/doiboost/input/crossref/relations</arg>
|
||||
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
|
||||
<arg>-do</arg><arg>${workingDirPath}/doiBoostOrganization</arg>
|
||||
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.dhp.doiboost
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset}
|
||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||
import eu.dnetlib.doiboost.{DoiBoostMappingUtil, HostedByItemType}
|
||||
import eu.dnetlib.doiboost.SparkGenerateDoiBoost.getClass
|
||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||
import eu.dnetlib.doiboost.orcid.ORCIDElement
|
||||
|
@ -15,45 +15,44 @@ import scala.io.Source
|
|||
|
||||
class DoiBoostHostedByMapTest {
|
||||
|
||||
@Test
|
||||
def testLoadMap(): Unit = {
|
||||
println(DoiBoostMappingUtil.retrieveHostedByMap().keys.size)
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testMerge():Unit = {
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master("local[*]").getOrCreate()
|
||||
|
||||
|
||||
|
||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
|
||||
|
||||
|
||||
import spark.implicits._
|
||||
val dataset:Dataset[ORCIDElement] = spark.read.json("/home/sandro/orcid").as[ORCIDElement]
|
||||
|
||||
|
||||
dataset.show(false)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
// @Test
|
||||
// def testMerge():Unit = {
|
||||
// val conf: SparkConf = new SparkConf()
|
||||
// val spark: SparkSession =
|
||||
// SparkSession
|
||||
// .builder()
|
||||
// .config(conf)
|
||||
// .appName(getClass.getSimpleName)
|
||||
// .master("local[*]").getOrCreate()
|
||||
//
|
||||
//
|
||||
//
|
||||
// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
|
||||
// implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
|
||||
//
|
||||
//
|
||||
// import spark.implicits._
|
||||
// val dataset:RDD[String]= spark.sparkContext.textFile("/home/sandro/Downloads/hbMap.gz")
|
||||
//
|
||||
//
|
||||
// val hbMap:Dataset[(String, HostedByItemType)] =spark.createDataset(dataset.map(DoiBoostMappingUtil.toHostedByItem))
|
||||
//
|
||||
//
|
||||
// hbMap.show()
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
//
|
||||
// }
|
||||
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package eu.dnetlib.doiboost.mag
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||
import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature
|
||||
import org.apache.spark.SparkConf
|
||||
|
@ -39,6 +41,17 @@ class MAGMappingTest {
|
|||
|
||||
|
||||
|
||||
@Test
|
||||
def testDate() :Unit = {
|
||||
|
||||
val p:Timestamp = Timestamp.valueOf("2011-10-02 00:00:00")
|
||||
|
||||
println(p.toString.substring(0,10))
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
def buildInvertedIndexTest(): Unit = {
|
||||
val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString
|
||||
|
|
Loading…
Reference in New Issue