|
|
|
@ -6,9 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
|
|
|
import java.io.Serializable;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
|
|
|
|
import eu.dnetlib.dhp.oa.graph.dump.Constants;
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
|
import org.apache.spark.api.java.function.FilterFunction;
|
|
|
|
|
import org.apache.spark.api.java.function.MapFunction;
|
|
|
|
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
|
|
|
|
import org.apache.spark.sql.Dataset;
|
|
|
|
@ -64,19 +64,19 @@ public class SparkResultLinkedToProject implements Serializable {
|
|
|
|
|
isSparkSessionManaged,
|
|
|
|
|
spark -> {
|
|
|
|
|
Utils.removeOutputDir(spark, outputPath);
|
|
|
|
|
writeResultsLikedToProjects(spark, inputClazz, inputPath, outputPath, relationPath);
|
|
|
|
|
writeResultsLinkedToProjects(spark, inputClazz, inputPath, outputPath, relationPath);
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static <R extends Result> void writeResultsLikedToProjects(SparkSession spark, Class<R> inputClazz,
|
|
|
|
|
String inputPath, String outputPath, String relationPath) {
|
|
|
|
|
private static <R extends Result> void writeResultsLinkedToProjects(SparkSession spark, Class<R> inputClazz,
|
|
|
|
|
String inputPath, String outputPath, String relationPath) {
|
|
|
|
|
|
|
|
|
|
Dataset<R> results = Utils
|
|
|
|
|
.readPath(spark, inputPath, inputClazz)
|
|
|
|
|
.filter("dataInfo.deletedbyinference = false and datainfo.invisible = false");
|
|
|
|
|
Dataset<Relation> relations = Utils
|
|
|
|
|
.readPath(spark, relationPath, Relation.class)
|
|
|
|
|
.filter("dataInfo.deletedbyinference = false and lower(relClass) = 'isproducedby'");
|
|
|
|
|
.filter("dataInfo.deletedbyinference = false and lower(relClass) = '" + Constants.RESULT_PROJECT_IS_PRODUCED_BY.toLowerCase() + "'");
|
|
|
|
|
|
|
|
|
|
relations
|
|
|
|
|
.joinWith(
|
|
|
|
|