1
0
Fork 0

Added Action set generation for the MAG organization

This commit is contained in:
Sandro La Bruzzo 2024-04-16 13:39:15 +02:00
parent 41a42dde64
commit a5ddd8dfbb
4 changed files with 122 additions and 15 deletions

View File

@ -0,0 +1,21 @@
[
{
"paramName": "m",
"paramLongName": "master",
"paramDescription": "the master name",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "The as output Path",
"paramRequired": true
},
{
"paramName": "ma",
"paramLongName": "magBasePath",
"paramDescription": "The mag Base path",
"paramRequired": false
}
]

View File

@ -1,19 +1,11 @@
package eu.dnetlib.dhp.collection.mag
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.common.ModelConstants
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._
import eu.dnetlib.dhp.schema.oaf.utils.{OafMapperUtils, PidType}
import eu.dnetlib.dhp.schema.oaf.{
Author,
DataInfo,
Instance,
Journal,
Publication,
Relation,
Result,
Dataset => OafDataset
}
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Journal, Organization, Publication, Relation, Result, Dataset => OafDataset}
import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@ -121,7 +113,7 @@ object MagUtility extends Serializable {
"DateTime" -> DateType
)
val stream = Map(
val stream: Map[String, (String, Seq[String])] = Map(
"Affiliations" -> Tuple2(
"mag/Affiliations.txt",
Seq(
@ -351,7 +343,7 @@ object MagUtility extends Serializable {
def getSchema(streamName: String): StructType = {
var schema = new StructType()
val d: Seq[String] = stream(streamName)._2
d.foreach { case t =>
d.foreach { t =>
val currentType = t.split(":")
val fieldName: String = currentType.head
var fieldType: String = currentType.last
@ -686,6 +678,58 @@ object MagUtility extends Serializable {
}
def generateOrganization(r: Row): String = {
val o = new Organization
val affId = s"20|mag_________::${DHPUtils.md5(r.getAs[Long]("AffiliationId").toString)}"
o.setId(affId)
o.setDataInfo(MAGDataInfo)
o.setCollectedfrom(List(MAGCollectedFrom).asJava)
o.setLegalname(field(r.getAs[String]("DisplayName"), null))
val gid = r.getAs[String]("GridId")
if (gid != null) {
o.setPid(List(
structuredProperty(gid, qualifier(
PidType.GRID.toString,
PidType.GRID.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES
),
null),
structuredProperty(r.getAs[Long]("AffiliationId").toString, qualifier(
PidType.mag_id.toString,
PidType.mag_id.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES
),
null)
).asJava)
} else {
o.setPid(List(
structuredProperty(r.getAs[Long]("AffiliationId").toString, qualifier(
PidType.mag_id.toString,
PidType.mag_id.toString,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES
),
null)
).asJava)
}
val c = r.getAs[String]("Iso3166Code")
if (c != null)
o.setCountry(qualifier(c, c, "dnet:countries", "dnet:countries"))
else
o.setCountry(ModelConstants.UNKNOWN_COUNTRY)
val ws = r.getAs[String]("OfficialPage")
if (ws != null)
o.setWebsiteurl(field(ws, null))
val a = new AtomicAction[Organization]()
a.setClazz(classOf[Organization])
a.setPayload(o)
mapper.writeValueAsString(a)
}
def generateAffiliationRelations(paperAffiliation: Row): List[Relation] = {
val affId = s"20|mag_________::${DHPUtils.md5(paperAffiliation.getAs[Long]("AffiliationId").toString)}"

View File

@ -0,0 +1,42 @@
package eu.dnetlib.dhp.collection.mag
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.Organization
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
class SparkMagOrganizationAS (propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
/** Here all the spark applications runs this method
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
val magBasePath:String = parser.get("magBasePath")
log.info(s"magBasePath is $magBasePath")
val outputPath:String = parser.get("outputPath")
log.info(s"outputPath is $outputPath")
generateAS(spark,magBasePath, outputPath)
}
def generateAS(spark:SparkSession, magBasePath:String,outputPath:String ):Unit = {
import spark.implicits._
val organizations = MagUtility.loadMagEntity(spark,"Affiliations", magBasePath)
organizations.map(r => MagUtility.generateOrganization(r)).write.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.text(outputPath)
}
}
object SparkMagOrganizationAS{
val log: Logger = LoggerFactory.getLogger(SparkMagOrganizationAS.getClass)
def main(args: Array[String]): Unit = {
new SparkMagOrganizationAS("/eu/dnetlib/dhp/collection/mag/create_organization_AS.json", args, log)
.initialize()
.run()
}
}

View File

@ -10,6 +10,7 @@ class MAGMappingTest {
val mapper = new ObjectMapper()
def mappingTest(): Unit = {
val spark = SparkSession
@ -18,10 +19,9 @@ class MAGMappingTest {
.master("local[*]")
.getOrCreate()
val s = new SparkMAGtoOAF(null, null, null)
val s = new SparkMagOrganizationAS(null, null, null)
s.convertMAG(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_oaf")
s.generateAffiliations(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_oaf")
s.generateAS(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_AS")
}