From 72bae7af76631c680289bf0ae7472665a9198b10 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 19 Feb 2024 16:20:12 +0100 Subject: [PATCH] [Transformative Agreement] removed the relations from the ActionSet waiting to have the gree light from Ioanna --- .../CreateActionSetSparkJob.java | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java index 70145d800c..e1c3226cd6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java @@ -13,6 +13,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; @@ -77,22 +78,22 @@ public class CreateActionSetSparkJob implements Serializable { } private static void createActionSet(SparkSession spark, String inputPath, String outputPath) { - spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER - .readValue(value, TransformativeAgreementModel.class), - Encoders.bean(TransformativeAgreementModel.class)) - .flatMap( - (FlatMapFunction) value -> createRelation( - value) - .iterator(), - Encoders.bean(Relation.class)) - .filter((FilterFunction) Objects::nonNull) - .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p)) - .union( + JavaRDD relations = spark + .read() + .textFile(inputPath) + .map( + (MapFunction) value -> OBJECT_MAPPER + .readValue(value, TransformativeAgreementModel.class), + Encoders.bean(TransformativeAgreementModel.class)) + .flatMap( + (FlatMapFunction) value -> createRelation( + value) + .iterator(), + Encoders.bean(Relation.class)) + .filter((FilterFunction) Objects::nonNull) + .toJavaRDD() + .map(p -> new AtomicAction(p.getClass(), p)); +//TODO relations in stand-by waiting to know if we need to create them or not In case we need just make a union before saving the sequence file spark .read() .textFile(inputPath) @@ -106,7 +107,7 @@ public class CreateActionSetSparkJob implements Serializable { Encoders.bean(Result.class)) .filter((FilterFunction) r -> r != null) .toJavaRDD() - .map(p -> new AtomicAction(p.getClass(), p))) + .map(p -> new AtomicAction(p.getClass(), p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa))))