From f9e940a2af7da2015d038ae137f0e90d0d93076e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 18 Apr 2024 10:09:57 +0200 Subject: [PATCH] update function to generete AS as sequence file --- .../dhp/collection/mag/MagUtility.scala | 4 ++-- .../mag/SparkMagOrganizationAS.scala | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 48cb3276a..660e6b443 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -678,7 +678,7 @@ object MagUtility extends Serializable { } - def generateOrganization(r: Row): String = { + def generateOrganization(r: Row): (String,String) = { val o = new Organization val affId = s"20|mag_________::${DHPUtils.md5(r.getAs[Long]("AffiliationId").toString)}" @@ -727,7 +727,7 @@ object MagUtility extends Serializable { val a = new AtomicAction[Organization]() a.setClazz(classOf[Organization]) a.setPayload(o) - mapper.writeValueAsString(a) + (a.getClazz.getName,mapper.writeValueAsString(a)) } def generateAffiliationRelations(paperAffiliation: Row): List[Relation] = { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala index 096a03f45..1ba166574 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMagOrganizationAS.scala @@ -3,6 +3,9 @@ package eu.dnetlib.dhp.collection.mag import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.oaf.Organization +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec} +import org.apache.hadoop.mapred.SequenceFileOutputFormat import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -24,9 +27,18 @@ class SparkMagOrganizationAS (propertyPath: String, args: Array[String], log: Lo def generateAS(spark:SparkSession, magBasePath:String,outputPath:String ):Unit = { import spark.implicits._ val organizations = MagUtility.loadMagEntity(spark,"Affiliations", magBasePath) - organizations.map(r => MagUtility.generateOrganization(r)).write.mode(SaveMode.Overwrite) - .option("compression", "gzip") - .text(outputPath) + organizations + .map(r => MagUtility.generateOrganization(r)) + .rdd + .map(s => (new Text(s._1), new Text(s._2))) + .filter(s => s!=null) + .saveAsHadoopFile( + outputPath, + classOf[Text], + classOf[Text], + classOf[SequenceFileOutputFormat[Text, Text]], + classOf[BZip2Codec] + ) } }