ActionSets migration serialize the output as plain text files instead of SequenceFiles

This commit is contained in:
Claudio Atzori 2020-04-01 14:58:22 +02:00
parent 201d79021e
commit 7061d07727
1 changed files with 9 additions and 11 deletions

View File

@ -30,6 +30,7 @@ import scala.Tuple2;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Objects;
public class TransformActions implements Serializable { public class TransformActions implements Serializable {
@ -60,7 +61,7 @@ public class TransformActions implements Serializable {
final String targetBaseDir = getTargetBaseDir(isLookupUrl); final String targetBaseDir = getTargetBaseDir(isLookupUrl);
try(SparkSession spark = getSparkSession(parser)) { try(SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration()); final FileSystem fs = FileSystem.get(spark.sparkContext().hadoopConfiguration());
for(String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) { for(String sourcePath : Lists.newArrayList(Splitter.on(",").split(inputPaths))) {
@ -81,25 +82,22 @@ public class TransformActions implements Serializable {
log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory)); log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory));
sc.sequenceFile(sourcePath, Text.class, Text.class) sc.sequenceFile(sourcePath, Text.class, Text.class)
.mapToPair(a -> new Tuple2<>(a._1(), eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))) .map(a -> eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString()))
.mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._1().toString(), a._2()))) .map(a -> doTransform(a))
.filter(t -> StringUtils.isNotBlank(t._2().toString())) .filter(Objects::isNull)
.saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); .filter(a -> a.getPayload() == null)
.map(a -> new ObjectMapper().writeValueAsString(a))
.saveAsTextFile(targetDirectory.toString(), GzipCodec.class);
} }
} }
} }
private Text transformAction(String atomicaActionId, eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException { private Text transformAction(eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
final Text out = new Text(); final Text out = new Text();
final ObjectMapper mapper = new ObjectMapper(); final ObjectMapper mapper = new ObjectMapper();
if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) { if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
out.set(mapper.writeValueAsString(doTransform(aa))); out.set(mapper.writeValueAsString(doTransform(aa)));
} else {
if (atomicaActionId.contains("dedupSimilarity")) {
out.set(mapper.writeValueAsString(getRelationAtomicAction(atomicaActionId)));
}
} }
return out; return out;
} }