[cleaning] country cleaning must use both PID and AlternateIdentifier fields

This commit is contained in:
Claudio Atzori 2022-12-15 14:49:04 +01:00
parent b8bafab8a0
commit 7b80b24f82
1 changed files with 42 additions and 2 deletions

View File

@ -4,9 +4,12 @@ package eu.dnetlib.dhp.oa.graph.clean.country;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.swing.text.html.Option; import javax.swing.text.html.Option;
@ -30,6 +33,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob; import eu.dnetlib.dhp.oa.graph.clean.CleanContextSparkJob;
import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.PidType; import eu.dnetlib.dhp.schema.oaf.utils.PidType;
@ -110,8 +114,8 @@ public class CleanCountrySparkJob implements Serializable {
return r; return r;
} }
if (r List<StructuredProperty> ids = getPidsAndAltIds(r).collect(Collectors.toList());
.getPid() if (ids
.stream() .stream()
.anyMatch( .anyMatch(
p -> p p -> p
@ -148,6 +152,42 @@ public class CleanCountrySparkJob implements Serializable {
.json(inputPath); .json(inputPath);
} }
private static <T extends Result> Stream<StructuredProperty> getPidsAndAltIds(T r) {
final Stream<StructuredProperty> resultPids = Optional
.ofNullable(r.getPid())
.map(Collection::stream)
.orElse(Stream.empty());
final Stream<StructuredProperty> instancePids = Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.flatMap(
i -> Optional
.ofNullable(i.getPid())
.map(Collection::stream)
.orElse(Stream.empty())))
.orElse(Stream.empty());
final Stream<StructuredProperty> instanceAltIds = Optional
.ofNullable(r.getInstance())
.map(
instance -> instance
.stream()
.flatMap(
i -> Optional
.ofNullable(i.getAlternateIdentifier())
.map(Collection::stream)
.orElse(Stream.empty())))
.orElse(Stream.empty());
return Stream
.concat(
Stream.concat(resultPids, instancePids),
instanceAltIds);
}
private static boolean pidInParam(String value, String[] verifyParam) { private static boolean pidInParam(String value, String[] verifyParam) {
for (String s : verifyParam) for (String s : verifyParam)
if (value.startsWith(s)) if (value.startsWith(s))