From a5ddd8dfbb68e6d9cabf02ee706b15b64bc9f1c3 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 16 Apr 2024 13:39:15 +0200 Subject: [PATCH] Added Action set generation for the MAG organization --- .../mag/create_organization_AS.json | 21 ++++++ .../dhp/collection/mag/MagUtility.scala | 68 +++++++++++++++---- .../mag/SparkMagOrganizationAS.scala | 42 ++++++++++++ .../dhp/collection/mag/MAGMappingTest.scala | 6 +- 4 files changed, 122 insertions(+), 15 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_organization_AS.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_organization_AS.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_organization_AS.json new file mode 100644 index 000000000..8efa3fd88 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/create_organization_AS.json @@ -0,0 +1,21 @@ +[ + { + "paramName": "m", + "paramLongName": "master", + "paramDescription": "the master name", + "paramRequired": true + }, + { + "paramName": "o", + "paramLongName": "outputPath", + "paramDescription": "The as output Path", + "paramRequired": true + }, + { + "paramName": "ma", + "paramLongName": "magBasePath", + "paramDescription": "The mag Base path", + "paramRequired": false + } + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 1d09121cc..48cb3276a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -1,19 +1,11 @@ package eu.dnetlib.dhp.collection.mag import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._ import eu.dnetlib.dhp.schema.oaf.utils.{OafMapperUtils, PidType} -import eu.dnetlib.dhp.schema.oaf.{ - Author, - DataInfo, - Instance, - Journal, - Publication, - Relation, - Result, - Dataset => OafDataset -} +import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Journal, Organization, Publication, Relation, Result, Dataset => OafDataset} import eu.dnetlib.dhp.utils.DHPUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.{Dataset, Row, SparkSession} @@ -121,7 +113,7 @@ object MagUtility extends Serializable { "DateTime" -> DateType ) - val stream = Map( + val stream: Map[String, (String, Seq[String])] = Map( "Affiliations" -> Tuple2( "mag/Affiliations.txt", Seq( @@ -351,7 +343,7 @@ object MagUtility extends Serializable { def getSchema(streamName: String): StructType = { var schema = new StructType() val d: Seq[String] = stream(streamName)._2 - d.foreach { case t => + d.foreach { t => val currentType = t.split(":") val fieldName: String = currentType.head var fieldType: String = currentType.last @@ -686,6 +678,58 @@ object MagUtility extends Serializable { } + def generateOrganization(r: Row): String = { + + val o = new Organization + val affId = s"20|mag_________::${DHPUtils.md5(r.getAs[Long]("AffiliationId").toString)}" + o.setId(affId) + o.setDataInfo(MAGDataInfo) + o.setCollectedfrom(List(MAGCollectedFrom).asJava) + o.setLegalname(field(r.getAs[String]("DisplayName"), null)) + val gid = r.getAs[String]("GridId") + if (gid != null) { + o.setPid(List( + structuredProperty(gid, qualifier( + PidType.GRID.toString, + PidType.GRID.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null), + structuredProperty(r.getAs[Long]("AffiliationId").toString, qualifier( + PidType.mag_id.toString, + PidType.mag_id.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null) + + ).asJava) + } else { + o.setPid(List( + structuredProperty(r.getAs[Long]("AffiliationId").toString, qualifier( + PidType.mag_id.toString, + PidType.mag_id.toString, + ModelConstants.DNET_PID_TYPES, + ModelConstants.DNET_PID_TYPES + ), + null) + ).asJava) + } + val c = r.getAs[String]("Iso3166Code") + if (c != null) + o.setCountry(qualifier(c, c, "dnet:countries", "dnet:countries")) + else + o.setCountry(ModelConstants.UNKNOWN_COUNTRY) + val ws = r.getAs[String]("OfficialPage") + if (ws != null) + o.setWebsiteurl(field(ws, null)) + val a = new AtomicAction[Organization]() + a.setClazz(classOf[Organization]) + a.setPayload(o) + mapper.writeValueAsString(a) + } + def generateAffiliationRelations(paperAffiliation: Row): List[Relation] = { val affId = s"20|mag_________::${DHPUtils.md5(paperAffiliation.getAs[Long]("AffiliationId").toString)}" diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala new file mode 100644 index 000000000..096a03f45 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala @@ -0,0 +1,42 @@ +package eu.dnetlib.dhp.collection.mag + +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.schema.action.AtomicAction +import eu.dnetlib.dhp.schema.oaf.Organization +import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +class SparkMagOrganizationAS (propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + val magBasePath:String = parser.get("magBasePath") + log.info(s"magBasePath is $magBasePath") + val outputPath:String = parser.get("outputPath") + log.info(s"outputPath is $outputPath") + generateAS(spark,magBasePath, outputPath) + + } + + def generateAS(spark:SparkSession, magBasePath:String,outputPath:String ):Unit = { + import spark.implicits._ + val organizations = MagUtility.loadMagEntity(spark,"Affiliations", magBasePath) + organizations.map(r => MagUtility.generateOrganization(r)).write.mode(SaveMode.Overwrite) + .option("compression", "gzip") + .text(outputPath) + } +} + +object SparkMagOrganizationAS{ + + val log: Logger = LoggerFactory.getLogger(SparkMagOrganizationAS.getClass) + def main(args: Array[String]): Unit = { + new SparkMagOrganizationAS("/eu/dnetlib/dhp/collection/mag/create_organization_AS.json", args, log) + .initialize() + .run() + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala index 47105b732..e41ccc41a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/collection/mag/MAGMappingTest.scala @@ -10,6 +10,7 @@ class MAGMappingTest { val mapper = new ObjectMapper() + def mappingTest(): Unit = { val spark = SparkSession @@ -18,10 +19,9 @@ class MAGMappingTest { .master("local[*]") .getOrCreate() - val s = new SparkMAGtoOAF(null, null, null) + val s = new SparkMagOrganizationAS(null, null, null) - s.convertMAG(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_oaf") - s.generateAffiliations(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_oaf") + s.generateAS(spark, "/home/sandro/Downloads/mag_test", "/home/sandro/Downloads/mag_AS") }