diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java index 19a0cb5c9..65dec7b7f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java @@ -30,6 +30,7 @@ import scala.Tuple2; import java.io.IOException; import java.io.Serializable; import java.util.LinkedList; +import java.util.Objects; public class TransformActions implements Serializable { @@ -60,7 +61,7 @@ public class TransformActions implements Serializable { final String targetBaseDir = getTargetBaseDir(isLookupUrl); try(SparkSession spark = getSparkSession(parser)) { - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); for(String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { @@ -81,25 +82,22 @@ public class TransformActions implements Serializable { log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory)); sc.sequenceFile(sourcePath, Text.class, Text.class) - .mapToPair(a -> new Tuple2<>(a._1(), eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))) - .mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._1().toString(), a._2()))) - .filter(t -> StringUtils.isNotBlank(t._2().toString())) - .saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .map(a -> eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString())) + .map(a -> doTransform(a)) + .filter(Objects::isNull) + .filter(a -> a.getPayload() == null) + .map(a -> new ObjectMapper().writeValueAsString(a)) + .saveAsTextFile(targetDirectory.toString(), GzipCodec.class); } } } - private Text transformAction(String atomicaActionId, eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { + private Text transformAction(eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { final Text out = new Text(); final ObjectMapper mapper = new ObjectMapper(); if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) { out.set(mapper.writeValueAsString(doTransform(aa))); - } else { - if (atomicaActionId.contains("dedupSimilarity")) { - out.set(mapper.writeValueAsString(getRelationAtomicAction(atomicaActionId))); - } } - return out; }