[SKG-IFv1.1] Adding additional constraints on properties values

This commit is contained in:
Miriam Baglioni 2024-09-22 18:21:09 +02:00
parent 09e2ac1fce
commit 638fa4da0a
1 changed files with 57 additions and 1 deletions

View File

@ -3,12 +3,17 @@ package eu.dnetlib.dhp.oa.graph.dump.filterentities;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
@ -69,7 +74,7 @@ public class FilterEntities implements Serializable {
} }
private static <R extends Result> void filterEntities(SparkSession spark, String inputPath, String filterPath, private static <R extends Result> void filterEntities(SparkSession spark, String inputPath, String filterPath,
String workingDir) { String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> { ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) { if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e); Class<R> resultClazz = ModelSupport.entityTypes.get(e);
@ -81,6 +86,8 @@ public class FilterEntities implements Serializable {
result result
.join(filterIds, result.col("id").equalTo(filterIds.col("id")), "leftsemi") .join(filterIds, result.col("id").equalTo(filterIds.col("id")), "leftsemi")
.as(Encoders.bean(resultClazz))
.filter((FilterFunction<R>) FilterEntities::verifyLot1Constraints)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
@ -90,4 +97,53 @@ public class FilterEntities implements Serializable {
}); });
} }
private static <R extends eu.dnetlib.dhp.schema.oaf.Result> boolean verifyLot1Constraints(R r) {
if (r.getInstance().stream().anyMatch(i ->
Arrays.asList("10|re3data_____::c4b2081b224be6b3e79d0e5e5556f631","10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4").contains(i.getHostedby().getKey())))
return true;
if (!Optional.ofNullable(r.getTitle()).isPresent() ||
r.getTitle().isEmpty() ||
!Optional.ofNullable(r.getDateofacceptance()).isPresent() ||
r.getDateofacceptance().getValue().isEmpty() ||
!isValidFormat(r.getDateofacceptance().getValue()))
return false;
switch(r.getResulttype().getClassname()){
case "publication":
return
r.getInstance().stream().anyMatch(i -> Arrays.asList("Book", "Article", "Journal", "Data Paper", "Software Paper", "Preprint", "Part of book or chapter of book",
"Thesis", "Master thesis", "Bachelor thesis", "Doctoral thesis", "Conference object", "Research", "Other literature type").contains(i.getInstancetype().getClassid()))
&&
Optional.ofNullable(r.getAuthor()).isPresent() && !r.getAuthor().isEmpty()
&& Optional.ofNullable(r.getDescription()).isPresent() && !r.getDescription().isEmpty();
case "dataset":
return
Optional.ofNullable(r.getAuthor()).isPresent() && !r.getAuthor().isEmpty();
case "software":
return true;
case "otherresearchproduct":
return true;
}
return false;
}
public static boolean isValidFormat(String value) {
try {
DateFormat df = new SimpleDateFormat(DATE_FORMAT);
df.setLenient(false);
df.parse(value);
return true;
} catch (ParseException e) {
return false;
}
}
} }