1
0
Fork 0

implemented generation of ActionSet

This commit is contained in:
Sandro La Bruzzo 2020-05-26 09:15:33 +02:00
parent 2408083566
commit 25f52e19a4
8 changed files with 484 additions and 198 deletions

View File

@ -1,10 +1,14 @@
package eu.dnetlib.doiboost
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Publication, Qualifier, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils
import org.codehaus.jackson.map.ObjectMapper
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
import scala.io.Source
@ -12,8 +16,12 @@ import scala.io.Source
case class HostedByItemType(id: String, officialName: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:String){}
object DoiBoostMappingUtil {
val logger: Logger = LoggerFactory.getLogger(getClass)
//STATIC STRING
val MAG = "microsoft"
val MAG_NAME = "Microsoft Academic Graph"
@ -30,6 +38,31 @@ object DoiBoostMappingUtil {
val invalidName = List(",", "none none", "none, none", "none &na;", "(:null)", "test test test", "test test", "test", "&na; &na;")
def toActionSet(item:Oaf) :(String, String) = {
val mapper = new ObjectMapper()
item match {
case dataset: Dataset =>
val a: AtomicAction[Dataset] = new AtomicAction[Dataset]
a.setClazz(classOf[Dataset])
a.setPayload(dataset)
(dataset.getClass.getCanonicalName, mapper.writeValueAsString(a))
case publication: Publication =>
val a: AtomicAction[Publication] = new AtomicAction[Publication]
a.setClazz(classOf[Publication])
a.setPayload(publication)
(publication.getClass.getCanonicalName, mapper.writeValueAsString(a))
case relation: Relation =>
val a: AtomicAction[Relation] = new AtomicAction[Relation]
a.setClazz(classOf[Relation])
a.setPayload(relation)
(relation.getClass.getCanonicalName, mapper.writeValueAsString(a))
case _ =>
null
}
}
def retrieveHostedByMap(): Map[String, HostedByItemType] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@ -52,35 +85,74 @@ object DoiBoostMappingUtil {
}
def generateGridAffiliationId(gridId:String) :String = {
s"10|grid________::${DHPUtils.md5(gridId.toLowerCase().trim())}"
}
def fixResult(result: Dataset) :Dataset = {
val instanceType = result.getInstance().asScala.find(i => i.getInstancetype != null && i.getInstancetype.getClassid.nonEmpty)
if (instanceType.isDefined) {
result.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype))
}
result.getInstance().asScala.foreach(i => {
val hb = new KeyValue
hb.setValue("Unknown Repository")
hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c")
i.setHostedby(hb)
})
result
}
def fixPublication(publication: Publication, hostedByMap: Map[String, HostedByItemType]): Publication = {
if (publication.getJournal == null)
return publication
val issn = if (publication.getJournal == null) null else publication.getJournal.getIssnPrinted
val eissn =if (publication.getJournal == null) null else publication.getJournal.getIssnOnline
val lissn =if (publication.getJournal == null) null else publication.getJournal.getIssnLinking
val issn = publication.getJournal.getIssnPrinted
val eissn = publication.getJournal.getIssnOnline
val lissn = publication.getJournal.getIssnLinking
val instanceType = publication.getInstance().asScala.find(i => i.getInstancetype != null && i.getInstancetype.getClassid.nonEmpty)
if (instanceType.isDefined) {
publication.getInstance().asScala.foreach(i => i.setInstancetype(instanceType.get.getInstancetype))
}
val item = retrieveHostedByItem(issn, eissn, lissn, hostedByMap)
if (item!= null) {
val l = publication.getInstance().asScala.map(i =>{
publication.getInstance().asScala.foreach(i => {
val hb = new KeyValue
if (item != null) {
hb.setValue(item.officialName)
hb.setKey (s"10|${item.id}" )
i.setHostedby(hb)
hb.setKey(generateDSId(item.id))
if (item.openAccess)
i.setAccessright(createQualifier("Open", "dnet:access_modes"))
i
}).asJava
publication.setBestaccessright(createQualifier("Open", "dnet:access_modes"))
}
else {
hb.setValue("Unknown Repository")
hb.setKey(s"10|$OPENAIRE_PREFIX::55045bd2a65019fd8e6741a755395c8c")
}
i.setHostedby(hb)
})
publication.setInstance(l)
val ar = publication.getInstance().asScala.filter(i => i.getInstancetype != null && i.getAccessright!= null && i.getAccessright.getClassid!= null).map(f=> f.getAccessright.getClassid)
if (ar.nonEmpty) {
if(ar.contains("Open")){
publication.setBestaccessright(createQualifier("Open", "dnet:access_modes"))
}
else {
publication.setBestaccessright(createQualifier(ar.head, "dnet:access_modes"))
}
}
publication
}
def generateDSId(input: String): String = {
val b = StringUtils.substringBefore(input, "::")
val a = StringUtils.substringAfter(input, "::")
s"10|${b}::${DHPUtils.md5(a)}"
}
def generateDataInfo(): DataInfo = {
generateDataInfo("0.9")
}
@ -229,7 +301,7 @@ def generateDataInfo (): DataInfo = {
def createMAGCollectedFrom(): KeyValue = {
val cf = new KeyValue
cf.setValue (MAG)
cf.setValue(MAG_NAME)
cf.setKey("10|" + OPENAIRE_PREFIX + SEPARATOR + DHPUtils.md5(MAG))
cf

View File

@ -0,0 +1,80 @@
package eu.dnetlib.doiboost
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import org.apache.hadoop.io.Text
import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.mapred.{SequenceFileOutputFormat, TextOutputFormat}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkGenerateDOIBoostActionSet {
val logger: Logger = LoggerFactory.getLogger(getClass)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/generate_doiboost_as_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
// implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
// implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
// implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
//
// implicit val mapEncoderAtomiAction: Encoder[AtomicAction[OafDataset]] = Encoders.kryo[AtomicAction[OafDataset]]
//
//
//
// val dbPublicationPath = parser.get("dbPublicationPath")
// val dbDatasetPath = parser.get("dbDatasetPath")
// val crossRefRelation = parser.get("crossRefRelation")
// val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
val workingDirPath = parser.get("targetPath")
//
// spark.read.load(dbDatasetPath).as[OafDataset]
// .map(d =>DoiBoostMappingUtil.fixResult(d))
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
//
// spark.read.load(dbPublicationPath).as[Publication]
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
//
// spark.read.load(crossRefRelation).as[Relation]
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
//
// spark.read.load(dbaffiliationRelationPath).as[Relation]
// .map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
implicit val mapEncoderPub: Encoder[(String, String)] = Encoders.tuple(Encoders.STRING, Encoders.STRING)
val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
SequenceFileOutputFormat
d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset_a8c2f90b-a3ae-4d6e-8187-47a437156e18_1590223414", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text,Text]], classOf[GzipCodec])
}
}

View File

@ -1,11 +1,14 @@
package eu.dnetlib.doiboost
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.doiboost.mag.ConversionUtil
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
object SparkGenerateDoiBoost {
@ -22,7 +25,7 @@ object SparkGenerateDoiBoost {
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
import spark.implicits._
val crossrefPublicationPath = parser.get("crossrefPublicationPath")
val crossrefDatasetPath = parser.get("crossrefDatasetPath")
val uwPublicationPath = parser.get("uwPublicationPath")
@ -32,15 +35,16 @@ object SparkGenerateDoiBoost {
logger.info("Phase 1) repartition and move all the dataset in a same working folder")
// spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
// spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
// spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
// spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
// spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
spark.read.load(crossrefPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefPublication")
spark.read.load(crossrefDatasetPath).as(Encoders.bean(classOf[OafDataset])).map(s => s)(Encoders.kryo[OafDataset]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/crossrefDataset")
spark.read.load(uwPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/uwPublication")
spark.read.load(orcidPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/orcidPublication")
spark.read.load(magPublicationPath).as(Encoders.bean(classOf[Publication])).map(s => s)(Encoders.kryo[Publication]).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/magPublication")
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
implicit val mapEncoderRel: Encoder[Relation] = Encoders.kryo[Relation]
logger.info("Phase 2) Join Crossref with UnpayWall")
@ -73,11 +77,49 @@ object SparkGenerateDoiBoost {
val doiBoostPublication: Dataset[Publication] = spark.read.load(s"$workingDirPath/doiBoostPublication").as[Publication]
doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
val map = DoiBoostMappingUtil.retrieveHostedByMap()
doiBoostPublication.filter(p=>DoiBoostMappingUtil.filterPublication(p)).map(p => DoiBoostMappingUtil.fixPublication(p, map)).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationFiltered")
val affiliationPath = parser.get("affiliationPath")
val paperAffiliationPath = parser.get("paperAffiliationPath")
val affiliation = spark.read.load(affiliationPath).where(col("GridId").isNotNull).select(col("AffiliationId"), col("GridId"))
val paperAffiliation = spark.read.load(paperAffiliationPath).select(col("AffiliationId").alias("affId"), col("PaperId"))
val a:Dataset[DoiBoostAffiliation] = paperAffiliation
.joinWith(affiliation, paperAffiliation("affId").equalTo(affiliation("AffiliationId"))).select(col("_1.PaperId"), col("_2.AffiliationId"), col("_2.GridId")).as[DoiBoostAffiliation]
val magPubs:Dataset[(String,Publication)]= spark.read.load(s"$workingDirPath/doiBoostPublicationFiltered").as[Publication]
.map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p))(tupleForJoinEncoder).filter(s =>s._1!= null )
magPubs.joinWith(a,magPubs("_1").equalTo(a("PaperId"))).flatMap(item => {
val pub:Publication = item._1._2
val affiliation = item._2
val r:Relation = new Relation
r.setSource(pub.getId)
r.setTarget(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
r.setRelType("resultOrganization")
r.setRelClass("hasAuthorInstitution")
r.setSubRelType("affiliation")
r.setDataInfo(pub.getDataInfo)
r.setCollectedfrom(pub.getCollectedfrom)
val r1:Relation = new Relation
r1.setTarget(pub.getId)
r1.setSource(DoiBoostMappingUtil.generateGridAffiliationId(affiliation.GridId))
r1.setRelType("resultOrganization")
r1.setRelClass("isAuthorInstitutionOf")
r1.setSubRelType("affiliation")
r1.setDataInfo(pub.getDataInfo)
r1.setCollectedfrom(pub.getCollectedfrom)
List(r, r1)
})(mapEncoderRel).write.mode(SaveMode.Overwrite).save(s"$workingDirPath/doiBoostPublicationAffiliation")
}
}

View File

@ -0,0 +1,8 @@
[
{"paramName": "m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true},
{"paramName": "dp", "paramLongName":"dbPublicationPath", "paramDescription": "the Crossref Publication Path", "paramRequired": true},
{"paramName": "dd", "paramLongName":"dbDatasetPath", "paramDescription": "the Crossref Dataset Path", "paramRequired": true},
{"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
{"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
{"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true}
]

View File

@ -5,5 +5,7 @@
{"paramName": "up", "paramLongName":"uwPublicationPath", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
{"paramName": "mp", "paramLongName":"magPublicationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
{"paramName": "op", "paramLongName":"orcidPublicationPath", "paramDescription": "the ORCID Publication Path", "paramRequired": true},
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
{"paramName": "w", "paramLongName":"workingDirPath", "paramDescription": "the Working Path", "paramRequired": true}
]

View File

@ -20,6 +20,18 @@
<name>orcidPublicationPath</name>
<description>the ORCID Publication Path</description>
</property>
<property>
<name>affiliationPath</name>
<description>the Affliation Path</description>
</property>
<property>
<name>paperAffiliationPath</name>
<description>the paperAffiliation Path</description>
</property>
<property>
<name>workingDirPath</name>
<description>the Working Path</description>
@ -40,7 +52,7 @@
<start to="CreateDOIBoost"/>
<start to="GenerateActionSet"/>
<kill name="Kill">
@ -75,36 +87,40 @@
<arg>--uwPublicationPath</arg><arg>${uwPublicationPath}</arg>
<arg>--magPublicationPath</arg><arg>${magPublicationPath}</arg>
<arg>--orcidPublicationPath</arg><arg>${orcidPublicationPath}</arg>
<arg>--affiliationPath</arg><arg>${affiliationPath}</arg>
<arg>--paperAffiliationPath</arg><arg>${paperAffiliationPath}</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="GenerateActionSet"/>
<error to="Kill"/>
</action>
<action name="GenerateActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Generate DOIBoost ActionSet</name>
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingDirPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>/data/doiboost/input/crossref/relations</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<!-- <action name="PreprocessMag">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>Convert Mag to Dataset</name>-->
<!-- <class>eu.dnetlib.doiboost.mag.SparkPreProcessMAG</class>-->
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!-- ${sparkExtraOPT}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;sourcePath</arg><arg>${sourcePath}</arg>-->
<!-- <arg>&#45;&#45;targetPath</arg><arg>${targetPath}</arg>-->
<!-- <arg>&#45;&#45;master</arg><arg>yarn-cluster</arg>-->
<!-- </spark>-->
<!-- <ok to="End"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<end name="End"/>
</workflow-app>

View File

@ -1,7 +1,11 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.{Publication, Dataset => OafDataset}
import eu.dnetlib.doiboost.DoiBoostMappingUtil
import eu.dnetlib.doiboost.SparkGenerateDoiBoost.getClass
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
class DoiBoostHostedByMapTest {
@ -13,4 +17,46 @@ class DoiBoostHostedByMapTest {
}
@Test
def testFilter():Unit = {
val conf: SparkConf = new SparkConf()
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]").getOrCreate()
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
implicit val mapEncoderDataset: Encoder[OafDataset] = Encoders.kryo[OafDataset]
implicit val tupleForJoinEncoder: Encoder[(String, Publication)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
val pub =spark.read.load("/data/doiboost/doiboostPublicationFiltered").as[Publication]
val mapper = new ObjectMapper()
val map = DoiBoostMappingUtil.retrieveHostedByMap()
println(pub.map(p => DoiBoostMappingUtil.fixPublication(p, map)).count())
}
@Test
def idDSGeneration():Unit = {
val s ="doajarticles::0066-782X"
println(DoiBoostMappingUtil.generateDSId(s))
}
}

File diff suppressed because one or more lines are too long