[stats wf] indicators across stats dbs & updates in the org ids #248

Closed
dimitris.pierrakos wants to merge 1742 commits from beta into beta2master_sept_2022
1 changed files with 5 additions and 0 deletions
Showing only changes of commit 8e8933d41a - Show all commits

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.PropagationConstant.removeOutputDir;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
@ -102,6 +103,7 @@ public class SparkBulkTagJob {
ResultTagger resultTagger = new ResultTagger();
readPath(spark, inputPath, resultClazz)
.map(patchResult(), Encoders.bean(resultClazz))
.filter(Objects::nonNull)
.map(
(MapFunction<R, R>) value -> resultTagger
.enrichContextCriteria(
@ -124,6 +126,9 @@ public class SparkBulkTagJob {
// TODO remove this hack as soon as the values fixed by this method will be provided as NON null
private static <R extends Result> MapFunction<R, R> patchResult() {
return r -> {
if(r.getDataInfo() == null){
return null;
}
if (r.getDataInfo().getDeletedbyinference() == null) {
r.getDataInfo().setDeletedbyinference(false);
}