From ebfba38ab604ef6dd6a8ba38a280e9ca04b2ff70 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 28 Jul 2023 19:03:47 +0300 Subject: [PATCH] Add changes from code review --- .../PrepareAffiliationRelations.java | 58 +++++++------------ 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 9b5d4a2ca..381558aae 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -10,15 +10,15 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,45 +82,32 @@ public class PrepareAffiliationRelations implements Serializable { private static void prepareAffiliationRelations(SparkSession spark, String inputPath, String outputPath) { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - // load and parse affiliation relations from HDFS - JavaRDD affiliationRelationsDeserializeRDD = sc - .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, AffiliationRelationDeserializer.class)); + Dataset df = spark + .read() + .schema("`DOI` STRING, `Matchings` ARRAY,`Confidence`:DOUBLE>>") + .json(inputPath); - // convert affiliation to an internal representation - Dataset affiliationRelations = spark - .createDataset( - affiliationRelationsDeserializeRDD - .flatMap( - entry -> entry - .getMatchings() - .stream() - .flatMap( - matching -> matching - .getRorId() - .stream() - .map( - rorId -> new AffiliationRelationModel( - entry.getDoi(), - rorId, - matching.getConfidence()))) - .collect(Collectors.toList()) - .iterator()) - .rdd(), - Encoders.bean(AffiliationRelationModel.class)); + // unroll nested arrays + df = df + .withColumn("matching", functions.explode(new Column("Matchings"))) + .withColumn("rorid", functions.explode(new Column("matching.RORid"))) + .select( + new Column("DOI").as("doi"), + new Column("rorid"), + new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations - affiliationRelations - .flatMap((FlatMapFunction) affRel -> { + df + .toJavaRDD() + .flatMap((FlatMapFunction) row -> { // DOI to OpenAIRE id final String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", affRel.getDoi())); + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(affRel.getRorId()); + final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); Qualifier qualifier = OafMapperUtils .qualifier( @@ -137,18 +124,17 @@ public class PrepareAffiliationRelations implements Serializable { true, false, qualifier, - Double.toString(affRel.getConfidence())); + Double.toString(row.getAs("confidence"))); // return bi-directional relations return getAffiliationRelationPair(paperId, affId, dataInfo).iterator(); - }, Encoders.bean(Relation.class)) - .toJavaRDD() + }) .map(p -> new AtomicAction(Relation.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); }