From 7ac1ba2e3526d096b5f1ee2f6aac007b15e6d58a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 4 Jun 2020 14:39:20 +0200 Subject: [PATCH] improvement DOIBoost --- .../doiboost/DoiBoostMappingUtil.scala | 103 ++++++++++++------ .../SparkGenerateDOIBoostActionSet.scala | 27 +++-- .../doiboost/SparkGenerateDoiBoost.scala | 61 +++++++---- .../doiboost/crossref/Crossref2Oaf.scala | 8 +- .../dnetlib/doiboost/mag/MagDataModel.scala | 6 +- .../doiboost/uw/SparkMapUnpayWallToOAF.scala | 5 +- .../dnetlib/doiboost/uw/UnpayWallToOAF.scala | 2 +- .../doiboost/generate_doiboost_as_params.json | 11 +- .../doiboost/generate_doiboost_params.json | 6 +- .../intersection/oozie_app/workflow.xml | 35 +----- .../doiboost/DoiBoostHostedByMapTest.scala | 75 +++++++------ .../dnetlib/doiboost/mag/MAGMappingTest.scala | 13 +++ 12 files changed, 201 insertions(+), 151 deletions(-) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 7f528f149..90bfacdc9 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -1,7 +1,7 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} +import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Organization, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.commons.lang3.StringUtils import org.codehaus.jackson.map.ObjectMapper @@ -14,11 +14,21 @@ import scala.collection.JavaConverters._ import scala.io.Source -case class HostedByItemType(id: String, officialName: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {} +case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {} -case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:String){} +case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:Option[String], OfficialPage:Option[String], DisplayName:Option[String]){} object DoiBoostMappingUtil { + def getUnknownCountry(): Qualifier = { + createQualifier("UNKNOWN","UNKNOWN","dnet:countries","dnet:countries") + } + + + + def generateMAGAffiliationId(affId: String): String = { + s"20|microsoft___$SEPARATOR${DHPUtils.md5(affId)}" + } + val logger: Logger = LoggerFactory.getLogger(getClass) @@ -52,6 +62,11 @@ object DoiBoostMappingUtil { a.setClazz(classOf[Publication]) a.setPayload(publication) (publication.getClass.getCanonicalName, mapper.writeValueAsString(a)) + case organization: Organization => + val a: AtomicAction[Organization] = new AtomicAction[Organization] + a.setClazz(classOf[Organization]) + a.setPayload(organization) + (organization.getClass.getCanonicalName, mapper.writeValueAsString(a)) case relation: Relation => val a: AtomicAction[Relation] = new AtomicAction[Relation] a.setClazz(classOf[Relation]) @@ -64,27 +79,33 @@ object DoiBoostMappingUtil { } - def retrieveHostedByMap(): Map[String, HostedByItemType] = { + def toHostedByItem(input:String): (String, HostedByItemType) = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - val jsonMap = Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/hbMap.json")).mkString - lazy val json: json4s.JValue = parse(jsonMap) - json.extract[Map[String, HostedByItemType]] + + lazy val json: json4s.JValue = parse(input) + val c :Map[String,HostedByItemType] = json.extract[Map[String, HostedByItemType]] + (c.keys.head, c.values.head) } - def retrieveHostedByItem(issn: String, eissn: String, lissn: String, hostedByMap: Map[String, HostedByItemType]): HostedByItemType = { - if (issn != null && issn.nonEmpty && hostedByMap.contains(issn)) - return hostedByMap(issn) - if (eissn != null && eissn.nonEmpty && hostedByMap.contains(eissn)) - return hostedByMap(eissn) - - if (lissn != null && lissn.nonEmpty && hostedByMap.contains(lissn)) - return hostedByMap(lissn) - - null + def toISSNPair(publication: Publication) : (String, Publication) = { + val issn = if (publication.getJournal == null) null else publication.getJournal.getIssnPrinted + val eissn =if (publication.getJournal == null) null else publication.getJournal.getIssnOnline + val lissn =if (publication.getJournal == null) null else publication.getJournal.getIssnLinking + if (issn!= null && issn.nonEmpty) + (issn, publication) + else if(eissn!= null && eissn.nonEmpty) + (eissn, publication) + else if(lissn!= null && lissn.nonEmpty) + (lissn, publication) + else + (publication.getId, publication) } + + + def generateGridAffiliationId(gridId:String) :String = { s"20|grid________::${DHPUtils.md5(gridId.toLowerCase().trim())}" } @@ -96,18 +117,36 @@ object DoiBoostMappingUtil { result.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype)) } result.getInstance().asScala.foreach(i => { - val hb = new KeyValue - hb.setValue("Unknown Repository") - hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c") - i.setHostedby(hb) + i.setHostedby(getUbknownHostedBy()) }) result } - def fixPublication(publication: Publication, hostedByMap: Map[String, HostedByItemType]): Publication = { - val issn = if (publication.getJournal == null) null else publication.getJournal.getIssnPrinted - val eissn =if (publication.getJournal == null) null else publication.getJournal.getIssnOnline - val lissn =if (publication.getJournal == null) null else publication.getJournal.getIssnLinking + def getUbknownHostedBy():KeyValue = { + val hb = new KeyValue + hb.setValue("Unknown Repository") + hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c") + hb + + } + + + def getOpenAccessQualifier():Qualifier = { + createQualifier("OPEN","Open Access","dnet:access_modes", "dnet:access_modes") + + } + + def getRestrictedQualifier():Qualifier = { + createQualifier("RESTRICTED","Restricted","dnet:access_modes", "dnet:access_modes") + + } + + def fixPublication(input:((String,Publication), (String,HostedByItemType))): Publication = { + + val publication = input._1._2 + + val item = if (input._2 != null) input._2._2 else null + val instanceType = publication.getInstance().asScala.find(i => i.getInstancetype != null && i.getInstancetype.getClassid.nonEmpty) @@ -115,15 +154,15 @@ object DoiBoostMappingUtil { publication.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype)) } - val item = retrieveHostedByItem(issn, eissn, lissn, hostedByMap) + publication.getInstance().asScala.foreach(i => { val hb = new KeyValue if (item != null) { - hb.setValue(item.officialName) + hb.setValue(item.officialname) hb.setKey(generateDSId(item.id)) if (item.openAccess) - i.setAccessright(createQualifier("OPEN", "dnet:access_modes")) - publication.setBestaccessright(createQualifier("OPEN", "dnet:access_modes")) + i.setAccessright(getOpenAccessQualifier()) + publication.setBestaccessright(getOpenAccessQualifier()) } else { hb.setValue("Unknown Repository") @@ -135,10 +174,10 @@ object DoiBoostMappingUtil { val ar = publication.getInstance().asScala.filter(i => i.getInstancetype != null && i.getAccessright!= null && i.getAccessright.getClassid!= null).map(f=> f.getAccessright.getClassid) if (ar.nonEmpty) { if(ar.contains("OPEN")){ - publication.setBestaccessright(createQualifier("OPEN", "dnet:access_modes")) + publication.setBestaccessright(getOpenAccessQualifier()) } else { - publication.setBestaccessright(createQualifier(ar.head, "dnet:access_modes")) + publication.setBestaccessright(getRestrictedQualifier()) } } publication @@ -298,6 +337,8 @@ object DoiBoostMappingUtil { } + + def createMAGCollectedFrom(): KeyValue = { val cf = new KeyValue diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala index 686d1438c..7a6cd3faa 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDOIBoostActionSet.scala @@ -2,10 +2,13 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.action.AtomicAction -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.GzipCodec +import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.spark.SparkConf -import org.apache.spark.sql.{ Encoder, Encoders, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} object SparkGenerateDOIBoostActionSet { @@ -23,8 +26,10 @@ object SparkGenerateDOIBoostActionSet { .master(parser.get("master")).getOrCreate() implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] + implicit val mapEncoderAS: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]] @@ -32,17 +37,23 @@ object SparkGenerateDOIBoostActionSet { val dbDatasetPath = parser.get("dbDatasetPath") val crossRefRelation = parser.get("crossRefRelation") val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath") + val dbOrganizationPath = parser.get("dbOrganizationPath") val workingDirPath = parser.get("targetPath") spark.read.load(dbDatasetPath).as[OafDataset] .map(d =>DoiBoostMappingUtil.fixResult(d)) .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) - .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet") spark.read.load(dbPublicationPath).as[Publication] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + spark.read.load(dbOrganizationPath).as[Organization] + .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) + .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") + + spark.read.load(crossRefRelation).as[Relation] .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING)) .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") @@ -52,15 +63,9 @@ object SparkGenerateDOIBoostActionSet { .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet") -// implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING) -// -// val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] -// -// -// -// d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec]) - + val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)] + d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec]) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala index 7f68b30d9..772af010f 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/SparkGenerateDoiBoost.scala @@ -1,7 +1,7 @@ package eu.dnetlib.doiboost import eu.dnetlib.dhp.application.ArgumentApplicationParser -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset, Organization} import eu.dnetlib.doiboost.mag.ConversionUtil import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf @@ -26,22 +26,13 @@ object SparkGenerateDoiBoost { .master(parser.get("master")).getOrCreate() import spark.implicits._ - val crossrefPublicationPath = parser.get("crossrefPublicationPath") - val crossrefDatasetPath = parser.get("crossrefDatasetPath") - val uwPublicationPath = parser.get("uwPublicationPath") - val magPublicationPath = parser.get("magPublicationPath") - val orcidPublicationPath = parser.get("orcidPublicationPath") + + val hostedByMapPath = parser.get("hostedByMapPath") val workingDirPath = parser.get("workingDirPath") -// logger.info("Phase 1) repartition and move all the dataset in a same working folder") -// spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication") -// spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset") -// spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication") -// spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication") -// spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication") - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] + implicit val mapEncoderOrg: Encoder[Organization] = Encoders.kryo[Organization] implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub) implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation] @@ -75,23 +66,26 @@ object SparkGenerateDoiBoost { sj.joinWith(magPublication, sj("_1").equalTo(magPublication("_1")), "left").map(applyMerge).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublication") - val doiBoostPublication: Dataset[Publication] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication] + val doiBoostPublication: Dataset[(String,Publication)] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication].filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(DoiBoostMappingUtil.toISSNPair)(tupleForJoinEncoder) - val map = DoiBoostMappingUtil.retrieveHostedByMap() + val hostedByDataset : Dataset[(String, HostedByItemType)] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(DoiBoostMappingUtil.toHostedByItem)) - doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(p => DoiBoostMappingUtil.fixPublication(p, map)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered") + doiBoostPublication.joinWith(hostedByDataset, doiBoostPublication("_1").equalTo(hostedByDataset("_1")), "left") + .map(DoiBoostMappingUtil.fixPublication) + .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered") val affiliationPath = parser.get("affiliationPath") val paperAffiliationPath = parser.get("paperAffiliationPath") - val affiliation = spark.read.load(affiliationPath).where(col("GridId").isNotNull).select(col("AffiliationId"), col("GridId")) + val affiliation = spark.read.load(affiliationPath).select(col("AffiliationId"), col("GridId"), col("OfficialPage"), col("DisplayName")) val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId")) val a:Dataset[DoiBoostAffiliation] = paperAffiliation - .joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))).select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId")).as[DoiBoostAffiliation] + .joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))) + .select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId"), col("_2.OfficialPage"), col("_2.DisplayName")).as[DoiBoostAffiliation] @@ -102,24 +96,45 @@ object SparkGenerateDoiBoost { magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => { val pub:Publication = item._1._2 val affiliation = item._2 + val affId:String = if (affiliation.GridId.isDefined) DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId.get) else DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString) val r:Relation = new Relation r.setSource(pub.getId) - r.setTarget(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId)) + r.setTarget(affId) r.setRelType("resultOrganization") r.setRelClass("hasAuthorInstitution") r.setSubRelType("affiliation") r.setDataInfo(pub.getDataInfo) - r.setCollectedfrom(pub.getCollectedfrom) + r.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) val r1:Relation = new Relation r1.setTarget(pub.getId) - r1.setSource(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId)) + r1.setSource(affId) r1.setRelType("resultOrganization") r1.setRelClass("isAuthorInstitutionOf") r1.setSubRelType("affiliation") r1.setDataInfo(pub.getDataInfo) - r1.setCollectedfrom(pub.getCollectedfrom) + r1.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) List(r, r1) })(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation") - } + + + magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).map( item => { + val affiliation = item._2 + if (affiliation.GridId.isEmpty) { + val o = new Organization + o.setCollectedfrom(List(DoiBoostMappingUtil.createMAGCollectedFrom()).asJava) + o.setDataInfo(DoiBoostMappingUtil.generateDataInfo()) + o.setId(DoiBoostMappingUtil.generateMAGAffiliationId(affiliation.AffiliationId.toString)) + o.setOriginalId(List(affiliation.AffiliationId.toString).asJava) + if (affiliation.DisplayName.nonEmpty) + o.setLegalname(DoiBoostMappingUtil.asField(affiliation.DisplayName.get)) + if (affiliation.OfficialPage.isDefined) + o.setWebsiteurl(DoiBoostMappingUtil.asField(affiliation.OfficialPage.get)) + o.setCountry(DoiBoostMappingUtil.getUnknownCountry()) + o + } + else + null + }).filter(o=> o!=null).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostOrganization") + } } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala index 731821892..cc2c9d586 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/crossref/Crossref2Oaf.scala @@ -106,7 +106,9 @@ case object Crossref2Oaf { // Publisher ( Name of work's publisher mapped into Result/Publisher) val publisher = (json \ "publisher").extractOrElse[String](null) - result.setPublisher(asField(publisher)) + if (publisher!= null && publisher.nonEmpty) + result.setPublisher(asField(publisher)) + // TITLE val mainTitles = for {JString(title) <- json \ "title" if title.nonEmpty} yield createSP(title, "main title", "dnet:dataCite_title") @@ -120,7 +122,7 @@ case object Crossref2Oaf { result.setDescription(descriptionList.asJava) // Source - val sourceList = for {JString(source) <- json \ "source" if source.nonEmpty} yield asField(source) + val sourceList = for {JString(source) <- json \ "source" if source!= null && source.nonEmpty} yield asField(source) result.setSource(sourceList.asJava) //RELEVANT DATE Mapping @@ -168,7 +170,7 @@ case object Crossref2Oaf { instance.setRefereed(asField("peerReviewed")) - instance.setAccessright(createQualifier("RESTRICTED", "dnet:access_modes")) + instance.setAccessright(getRestrictedQualifier()) result.setInstance(List(instance).asJava) instance.setInstancetype(createQualifier(cobjCategory.substring(0, 4), cobjCategory.substring(5), "dnet:publication_resource", "dnet:publication_resource")) result.setResourcetype(createQualifier(cobjCategory.substring(0, 4),"dnet:dataCite_resource")) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 65d2a7c51..1c6e1b0e6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -115,7 +115,7 @@ case object ConversionUtil { j.setName(ci.DisplayName.get) if (ci.StartDate.isDefined && ci.EndDate.isDefined) { - j.setConferencedate(s"${ci.StartDate.get.toString} - ${ci.EndDate.get.toString}") + j.setConferencedate(s"${ci.StartDate.get.toString.substring(0,10)} - ${ci.EndDate.get.toString.substring(0,10)}") } publication.setJournal(j) @@ -214,7 +214,7 @@ case object ConversionUtil { if (paper.Date != null && paper.Date.isDefined) { - pub.setDateofacceptance(asField(paper.Date.get.toString)) + pub.setDateofacceptance(asField(paper.Date.get.toString.substring(0,10))) } pub.setPublisher(asField(paper.Publisher)) @@ -280,7 +280,7 @@ case object ConversionUtil { if (paper.Date != null) { - pub.setDateofacceptance(asField(paper.Date.toString)) + pub.setDateofacceptance(asField(paper.Date.toString.substring(0,10))) } pub.setAuthor(authorsOAF.asJava) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala index eeec1af35..a72e4b0d6 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/SparkMapUnpayWallToOAF.scala @@ -27,7 +27,7 @@ object SparkMapUnpayWallToOAF { .appName(getClass.getSimpleName) .master(parser.get("master")).getOrCreate() - implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication] val sourcePath = parser.get("sourcePath") @@ -35,7 +35,8 @@ object SparkMapUnpayWallToOAF { val inputRDD:RDD[String] = spark.sparkContext.textFile(s"$sourcePath") logger.info("Converting UnpayWall to OAF") - val d:Dataset[Publication] = spark.createDataset(inputRDD.repartition(1000).map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication] + + val d:Dataset[Publication] = spark.createDataset(inputRDD.map(UnpayWallToOAF.convertToOAF).filter(p=>p!=null)).as[Publication] d.write.mode(SaveMode.Overwrite).save(targetPath) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala index ca104e2da..08cd4ee8e 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/uw/UnpayWallToOAF.scala @@ -46,7 +46,7 @@ object UnpayWallToOAF { val i :Instance= new Instance() i.setCollectedfrom(createUnpayWallCollectedFrom()) - i.setAccessright(createQualifier("OPEN", "dnet:access_modes")) + i.setAccessright(getOpenAccessQualifier()) i.setUrl(List(oaLocation.url.get).asJava) if (oaLocation.license.isDefined) diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json index 76059665f..6eb1ec6f1 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json @@ -1,8 +1,9 @@ [ - {"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}, - {"paramName": "dp", "paramLongName":"dbPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true}, - {"paramName": "dd", "paramLongName":"dbDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true}, - {"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true}, + {"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}, + {"paramName": "dp", "paramLongName":"dbPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true}, + {"paramName": "dd", "paramLongName":"dbDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true}, + {"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true}, {"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true}, - {"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true} + {"paramName": "do", "paramLongName":"dbOrganizationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true}, + {"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true} ] diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json index d5ef3726b..ea08f47d4 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/generate_doiboost_params.json @@ -1,10 +1,6 @@ [ {"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}, - {"paramName": "cp", "paramLongName":"crossrefPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true}, - {"paramName": "cd", "paramLongName":"crossrefDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true}, - {"paramName": "up", "paramLongName":"uwPublicationPath", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true}, - {"paramName": "mp", "paramLongName":"magPublicationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true}, - {"paramName": "op", "paramLongName":"orcidPublicationPath", "paramDescription": "the ORCID Publication Path", "paramRequired": true}, + {"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true}, {"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true}, {"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true}, {"paramName": "w", "paramLongName":"workingDirPath", "paramDescription": "the Working Path", "paramRequired": true} diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index a92253c67..34ba5d89d 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -1,37 +1,17 @@ - crossrefPublicationPath - the Crossref Publication Path + hostedByMapPath + the Hosted By Map Path - - crossrefDatasetPath - the Crossref Dataset Path - - - uwPublicationPath - the UnpayWall Publication Path - - - magPublicationPath - the MAG Publication Path - - - orcidPublicationPath - the ORCID Publication Path - - affiliationPath the Affliation Path - paperAffiliationPath the paperAffiliation Path - - workingDirPath the Working Path @@ -52,7 +32,7 @@ - + @@ -82,17 +62,13 @@ --conf spark.sql.shuffle.partitions=3840 ${sparkExtraOPT} - --crossrefPublicationPath${crossrefPublicationPath} - --crossrefDatasetPath${crossrefDatasetPath} - --uwPublicationPath${uwPublicationPath} - --magPublicationPath${magPublicationPath} - --orcidPublicationPath${orcidPublicationPath} + --hostedByMapPath${hostedByMapPath} --affiliationPath${affiliationPath} --paperAffiliationPath${paperAffiliationPath} --workingDirPath${workingDirPath} --masteryarn-cluster - + @@ -115,6 +91,7 @@ --dbDatasetPath${workingDirPath}/crossrefDataset --crossRefRelation/data/doiboost/input/crossref/relations --dbaffiliationRelationPath${workingDirPath}/doiBoostPublicationAffiliation + -do${workingDirPath}/doiBoostOrganization --targetPath${workingDirPath}/actionDataSet --masteryarn-cluster diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala index d8a9a0313..4912648be 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/dhp/doiboost/DoiBoostHostedByMapTest.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.doiboost import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset} -import eu.dnetlib.doiboost.DoiBoostMappingUtil +import eu.dnetlib.doiboost.{DoiBoostMappingUtil, HostedByItemType} import eu.dnetlib.doiboost.SparkGenerateDoiBoost.getClass import eu.dnetlib.doiboost.mag.ConversionUtil import eu.dnetlib.doiboost.orcid.ORCIDElement @@ -15,45 +15,44 @@ import scala.io.Source class DoiBoostHostedByMapTest { - @Test - def testLoadMap(): Unit = { - println(DoiBoostMappingUtil.retrieveHostedByMap().keys.size) - } - - - @Test - def testMerge():Unit = { - val conf: SparkConf = new SparkConf() - val spark: SparkSession = - SparkSession - .builder() - .config(conf) - .appName(getClass.getSimpleName) - .master("local[*]").getOrCreate() - - - - implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] - implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] - implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub) - - - import spark.implicits._ - val dataset:Dataset[ORCIDElement] = spark.read.json("/home/sandro/orcid").as[ORCIDElement] - - - dataset.show(false) - - - - - - - - - } +// @Test +// def testMerge():Unit = { +// val conf: SparkConf = new SparkConf() +// val spark: SparkSession = +// SparkSession +// .builder() +// .config(conf) +// .appName(getClass.getSimpleName) +// .master("local[*]").getOrCreate() +// +// +// +// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication] +// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset] +// implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub) +// +// +// import spark.implicits._ +// val dataset:RDD[String]= spark.sparkContext.textFile("/home/sandro/Downloads/hbMap.gz") +// +// +// val hbMap:Dataset[(String, HostedByItemType)] =spark.createDataset(dataset.map(DoiBoostMappingUtil.toHostedByItem)) +// +// +// hbMap.show() +// +// +// +// +// +// +// +// +// +// +// } @Test diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala index 1567e0008..88b1669f4 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/mag/MAGMappingTest.scala @@ -1,5 +1,7 @@ package eu.dnetlib.doiboost.mag +import java.sql.Timestamp + import eu.dnetlib.dhp.schema.oaf.Publication import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature import org.apache.spark.SparkConf @@ -39,6 +41,17 @@ class MAGMappingTest { + @Test + def testDate() :Unit = { + + val p:Timestamp = Timestamp.valueOf("2011-10-02 00:00:00") + + println(p.toString.substring(0,10)) + + } + + + @Test def buildInvertedIndexTest(): Unit = { val json_input = Source.fromInputStream(getClass.getResourceAsStream("invertedIndex.json")).mkString