forked from D-Net/dnet-hadoop
fixing issue on previous implementation
This commit is contained in:
parent
7e940f1991
commit
b9b6bdb2e6
|
@ -9,10 +9,7 @@ import java.util.*;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.*;
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -22,6 +19,7 @@ import eu.dnetlib.dhp.oa.graph.dump.ResultMapper;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
import eu.dnetlib.dhp.oa.graph.dump.Utils;
|
||||||
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
|
||||||
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
import eu.dnetlib.dhp.schema.dump.oaf.community.CommunityResult;
|
||||||
|
import eu.dnetlib.dhp.schema.dump.oaf.community.Project;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
@ -69,11 +67,12 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath, String relationPath) {
|
private static void writeResultProjectList(SparkSession spark, String inputPath, String outputPath,
|
||||||
|
String relationPath) {
|
||||||
|
|
||||||
Dataset<Relation> relation = Utils
|
Dataset<Relation> relation = Utils
|
||||||
.readPath(spark, relationPath + "/relation", Relation.class)
|
.readPath(spark, relationPath + "/relation", Relation.class)
|
||||||
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
|
.filter("dataInfo.deletedbyinference = false and relClass = 'isProducedBy'");
|
||||||
|
|
||||||
Dataset<CommunityResult> result = Utils
|
Dataset<CommunityResult> result = Utils
|
||||||
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
.readPath(spark, inputPath + "/publication", CommunityResult.class)
|
||||||
|
@ -81,18 +80,40 @@ public class SparkDumpFunderResults implements Serializable {
|
||||||
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
|
.union(Utils.readPath(spark, inputPath + "/otherresearchproduct", CommunityResult.class))
|
||||||
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
.union(Utils.readPath(spark, inputPath + "/software", CommunityResult.class));
|
||||||
|
|
||||||
result
|
List<String> funderList = relation
|
||||||
.joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner")
|
.select("target")
|
||||||
.map((MapFunction<Tuple2<CommunityResult, Relation>, FunderResults>) value -> {
|
.map((MapFunction<Row, String>) value -> value.getString(0).substring(0, 15), Encoders.STRING())
|
||||||
FunderResults res = (FunderResults) value._1();
|
.distinct()
|
||||||
res.setFunder_id(value._2().getSource().substring(3, 15));
|
.collectAsList();
|
||||||
return res;
|
|
||||||
}, Encoders.bean(FunderResults.class))
|
// Dataset<CommunityResult> results = result
|
||||||
.write()
|
// .joinWith(relation, result.col("id").equalTo(relation.col("target")), "inner")
|
||||||
.partitionBy("funder_id")
|
// .map((MapFunction<Tuple2<CommunityResult, Relation>, CommunityResult>) value -> {
|
||||||
.mode(SaveMode.Overwrite)
|
// return value._1();
|
||||||
.json(outputPath);
|
// }, Encoders.bean(CommunityResult.class));
|
||||||
|
|
||||||
|
funderList.forEach(funder -> writeFunderResult(funder, result, outputPath));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void writeFunderResult(String funder, Dataset<CommunityResult> results, String outputPath) {
|
||||||
|
|
||||||
|
results.map((MapFunction<CommunityResult, CommunityResult>) r -> {
|
||||||
|
if (!Optional.ofNullable(r.getProjects()).isPresent()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
for (Project p : r.getProjects()) {
|
||||||
|
if (p.getId().startsWith(funder)) {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}, Encoders.bean(CommunityResult.class))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.write()
|
||||||
|
.mode(SaveMode.Overwrite)
|
||||||
|
.option("compression", "gzip")
|
||||||
|
.json(outputPath + "/" + funder);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SaveMode;
|
import org.apache.spark.sql.SaveMode;
|
||||||
|
@ -70,15 +71,25 @@ public class SparkResultLinkedToProject implements Serializable {
|
||||||
private static <R extends Result> void writeResultsLikedToProjects(SparkSession spark, Class<R> inputClazz,
|
private static <R extends Result> void writeResultsLikedToProjects(SparkSession spark, Class<R> inputClazz,
|
||||||
String inputPath, String outputPath, String relationPath) {
|
String inputPath, String outputPath, String relationPath) {
|
||||||
|
|
||||||
Dataset<R> results = Utils.readPath(spark, inputPath, inputClazz);
|
Dataset<R> results = Utils
|
||||||
|
.readPath(spark, inputPath, inputClazz)
|
||||||
|
.filter("dataInfo.deletedbyinference = false and datainfo.invisible = false");
|
||||||
Dataset<Relation> relations = Utils
|
Dataset<Relation> relations = Utils
|
||||||
.readPath(spark, relationPath, Relation.class)
|
.readPath(spark, relationPath, Relation.class)
|
||||||
.filter("dataInfo.deletedbyinference = false and relClass = 'produces'");
|
.filter("dataInfo.deletedbyinference = false and lower(relClass) = 'isproducedby'");
|
||||||
|
|
||||||
relations
|
relations
|
||||||
.joinWith(
|
.joinWith(
|
||||||
results, relations.col("target").equalTo(results.col("id")),
|
results, relations.col("source").equalTo(results.col("id")),
|
||||||
"inner")
|
"inner")
|
||||||
.map((MapFunction<Tuple2<Relation, R>, R>) t2 -> t2._2(), Encoders.bean(inputClazz))
|
.groupByKey(
|
||||||
|
(MapFunction<Tuple2<Relation, R>, String>) value -> value
|
||||||
|
._2()
|
||||||
|
.getId(),
|
||||||
|
Encoders.STRING())
|
||||||
|
.mapGroups((MapGroupsFunction<String, Tuple2<Relation, R>, R>) (k, it) -> {
|
||||||
|
return it.next()._2();
|
||||||
|
}, Encoders.bean(inputClazz))
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
|
|
Loading…
Reference in New Issue