From 19806c2ae3267f0e1080b2afffba4c51b4fe07c7 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 23 Jul 2024 17:12:55 +0200 Subject: [PATCH] [SDG]fixed switch of methods --- .../PrepareSDGSparkJob.java | 18 +++++++++++++----- .../sdgnodoi/CreateActionSetSparkJob.java | 7 ++++++- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java index 012178c1e..be22077c3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareSDGSparkJob.java @@ -73,7 +73,7 @@ public class PrepareSDGSparkJob implements Serializable { }); } - private static void doPrepareoaid(SparkSession spark, String sourcePath, String outputPath) { + private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { Dataset sdgDataset = spark .read() .format("csv") @@ -84,7 +84,7 @@ public class PrepareSDGSparkJob implements Serializable { .load(sourcePath); sdgDataset - .groupByKey((MapFunction) v -> ((String) v.getAs("oaid")).toLowerCase(), Encoders.STRING()) + .groupByKey((MapFunction) v -> ((String) v.getAs("doi")).toLowerCase(), Encoders.STRING()) .mapGroups( (MapGroupsFunction) (k, it) -> getResult( @@ -100,11 +100,19 @@ public class PrepareSDGSparkJob implements Serializable { .json(outputPath + "/sdg"); } - private static void doPrepare(SparkSession spark, String sourcePath, String outputPath) { - Dataset sdgDataset = spark.read().csv(sourcePath); + private static void doPrepareoaid(SparkSession spark, String sourcePath, String outputPath) { + Dataset sdgDataset = spark + .read() + .format("csv") + .option("sep", DEFAULT_DELIMITER) + .option("inferSchema", "true") + .option("header", "true") + .option("quotes", "\"") + .load(sourcePath); + ; sdgDataset - .groupByKey((MapFunction) r -> ((String) r.getAs("doi")).toLowerCase(), Encoders.STRING()) + .groupByKey((MapFunction) r -> "50|" + ((String) r.getAs("oaid")), Encoders.STRING()) .mapGroups( (MapGroupsFunction) PrepareSDGSparkJob::getResult, Encoders.bean(Result.class)) .write() diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java index a2f8cfa39..0bc3b524b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/sdgnodoi/CreateActionSetSparkJob.java @@ -10,6 +10,7 @@ import java.util.Optional; import org.apache.commons.cli.ParseException; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Hdfs; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.Result; import scala.Tuple2; @@ -63,7 +65,10 @@ public class CreateActionSetSparkJob implements Serializable { runWithSparkSession( conf, isSparkSessionManaged, - spark -> createActionSet(spark, inputPath, outputPath)); + spark -> { + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + createActionSet(spark, inputPath, outputPath); + }); }