diff --git a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java index 3cdb299..2840dde 100644 --- a/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java +++ b/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/FilterEntities.java @@ -3,12 +3,17 @@ package eu.dnetlib.dhp.oa.graph.dump.filterentities; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.io.Serializable; +import java.util.Arrays; import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; @@ -69,7 +74,7 @@ public class FilterEntities implements Serializable { } private static void filterEntities(SparkSession spark, String inputPath, String filterPath, - String workingDir) { + String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); @@ -81,6 +86,8 @@ public class FilterEntities implements Serializable { result .join(filterIds, result.col("id").equalTo(filterIds.col("id")), "leftsemi") + .as(Encoders.bean(resultClazz)) + .filter((FilterFunction) FilterEntities::verifyLot1Constraints) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") @@ -90,4 +97,53 @@ public class FilterEntities implements Serializable { }); } + + private static boolean verifyLot1Constraints(R r) { + if (r.getInstance().stream().anyMatch(i -> + Arrays.asList("10|re3data_____::c4b2081b224be6b3e79d0e5e5556f631","10|openaire____::dbfd07503aaa1ed31beed7dec942f3f4").contains(i.getHostedby().getKey()))) + return true; + if (!Optional.ofNullable(r.getTitle()).isPresent() || + r.getTitle().isEmpty() || + !Optional.ofNullable(r.getDateofacceptance()).isPresent() || + r.getDateofacceptance().getValue().isEmpty() || + !isValidFormat(r.getDateofacceptance().getValue())) + return false; + + switch(r.getResulttype().getClassname()){ + case "publication": + return + r.getInstance().stream().anyMatch(i -> Arrays.asList("Book", "Article", "Journal", "Data Paper", "Software Paper", "Preprint", "Part of book or chapter of book", + "Thesis", "Master thesis", "Bachelor thesis", "Doctoral thesis", "Conference object", "Research", "Other literature type").contains(i.getInstancetype().getClassid())) + && + Optional.ofNullable(r.getAuthor()).isPresent() && !r.getAuthor().isEmpty() + && Optional.ofNullable(r.getDescription()).isPresent() && !r.getDescription().isEmpty(); + + + case "dataset": + return + Optional.ofNullable(r.getAuthor()).isPresent() && !r.getAuthor().isEmpty(); + + + case "software": + return true; + + case "otherresearchproduct": + return true; + + } + return false; + } + + public static boolean isValidFormat(String value) { + + try { + DateFormat df = new SimpleDateFormat(DATE_FORMAT); + df.setLenient(false); + df.parse(value); + return true; + } catch (ParseException e) { + return false; + } + + } }