enrichment steps #38
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.bulktag;
|
||||||
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -100,6 +101,7 @@ public class SparkBulkTagJob {
|
||||||
|
|
||||||
ResultTagger resultTagger = new ResultTagger();
|
ResultTagger resultTagger = new ResultTagger();
|
||||||
readPath(spark, inputPath, resultClazz)
|
readPath(spark, inputPath, resultClazz)
|
||||||
|
.map(patchResult(), Encoders.bean(resultClazz))
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<R, R>) value -> resultTagger
|
(MapFunction<R, R>) value -> resultTagger
|
||||||
.enrichContextCriteria(
|
.enrichContextCriteria(
|
||||||
|
@ -119,4 +121,17 @@ public class SparkBulkTagJob {
|
||||||
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
|
||||||
|
private static <R extends Result> MapFunction<R, R> patchResult() {
|
||||||
|
return (MapFunction<R, R>) r -> {
|
||||||
|
if (r.getDataInfo().getDeletedbyinference() == null) {
|
||||||
|
r.getDataInfo().setDeletedbyinference(false);
|
||||||
|
}
|
||||||
|
if (r.getContext() == null) {
|
||||||
|
r.setContext(new ArrayList<>());
|
||||||
|
}
|
||||||
|
return r;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue