[Transformative Agreement] removed the relations from the ActionSet waiting to have the gree light from Ioanna

This commit is contained in:
Miriam Baglioni 2024-02-19 16:20:12 +01:00
parent f0dc12634b
commit 72bae7af76
1 changed files with 18 additions and 17 deletions

View File

@ -13,6 +13,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -77,22 +78,22 @@ public class CreateActionSetSparkJob implements Serializable {
} }
private static void createActionSet(SparkSession spark, String inputPath, String outputPath) { private static void createActionSet(SparkSession spark, String inputPath, String outputPath) {
spark JavaRDD<AtomicAction> relations = spark
.read() .read()
.textFile(inputPath) .textFile(inputPath)
.map( .map(
(MapFunction<String, TransformativeAgreementModel>) value -> OBJECT_MAPPER (MapFunction<String, TransformativeAgreementModel>) value -> OBJECT_MAPPER
.readValue(value, TransformativeAgreementModel.class), .readValue(value, TransformativeAgreementModel.class),
Encoders.bean(TransformativeAgreementModel.class)) Encoders.bean(TransformativeAgreementModel.class))
.flatMap( .flatMap(
(FlatMapFunction<TransformativeAgreementModel, Relation>) value -> createRelation( (FlatMapFunction<TransformativeAgreementModel, Relation>) value -> createRelation(
value) value)
.iterator(), .iterator(),
Encoders.bean(Relation.class)) Encoders.bean(Relation.class))
.filter((FilterFunction<Relation>) Objects::nonNull) .filter((FilterFunction<Relation>) Objects::nonNull)
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p)) .map(p -> new AtomicAction(p.getClass(), p));
.union( //TODO relations in stand-by waiting to know if we need to create them or not In case we need just make a union before saving the sequence file
spark spark
.read() .read()
.textFile(inputPath) .textFile(inputPath)
@ -106,7 +107,7 @@ public class CreateActionSetSparkJob implements Serializable {
Encoders.bean(Result.class)) Encoders.bean(Result.class))
.filter((FilterFunction<Result>) r -> r != null) .filter((FilterFunction<Result>) r -> r != null)
.toJavaRDD() .toJavaRDD()
.map(p -> new AtomicAction(p.getClass(), p))) .map(p -> new AtomicAction(p.getClass(), p))
.mapToPair( .mapToPair(
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa)))) new Text(OBJECT_MAPPER.writeValueAsString(aa))))