From e5879b68c731fe9a3444582b2b96e5301ff5c67f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 19 Apr 2024 17:14:18 +0200 Subject: [PATCH] [transformative agreement] including reuslt-funder relations to the information imported from the TRs --- .../transformativeagreement/CreateActionSetSparkJob.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java index e8443c033..9880d0260 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/transformativeagreement/CreateActionSetSparkJob.java @@ -93,7 +93,7 @@ public class CreateActionSetSparkJob implements Serializable { .filter((FilterFunction) Objects::nonNull) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)); -//TODO relations in stand-by waiting to know if we need to create them or not In case we need just make a union before saving the sequence file + spark .read() .textFile(inputPath) @@ -108,6 +108,7 @@ public class CreateActionSetSparkJob implements Serializable { .filter((FilterFunction) r -> r != null) .toJavaRDD() .map(p -> new AtomicAction(p.getClass(), p)) + .union(relations) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), new Text(OBJECT_MAPPER.writeValueAsString(aa))))