dump #50

Merged
claudio.atzori merged 98 commits from miriam.baglioni/dnet-hadoop:dump into master 2020-11-04 18:07:01 +01:00
1 changed files with 45 additions and 12 deletions
Showing only changes of commit 387be43fd4 - Show all commits

View File

@ -8,6 +8,7 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -20,7 +21,6 @@ import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation;
/** /**
* Reads all the entities of the same type (Relation / Results) and saves them in the same folder * Reads all the entities of the same type (Relation / Results) and saves them in the same folder
*
*/ */
public class SparkCollectAndSave implements Serializable { public class SparkCollectAndSave implements Serializable {
@ -48,6 +48,11 @@ public class SparkCollectAndSave implements Serializable {
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
final Boolean aggregateResult = Optional
.ofNullable(parser.get("resultAggregation"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -55,13 +60,14 @@ public class SparkCollectAndSave implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
Utils.removeOutputDir(spark, outputPath + "/result"); Utils.removeOutputDir(spark, outputPath + "/result");
run(spark, inputPath, outputPath); run(spark, inputPath, outputPath, aggregateResult);
}); });
} }
private static void run(SparkSession spark, String inputPath, String outputPath) { private static void run(SparkSession spark, String inputPath, String outputPath, boolean aggregate) {
if (aggregate) {
Utils Utils
.readPath(spark, inputPath + "/result/publication", Result.class) .readPath(spark, inputPath + "/result/publication", Result.class)
.union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class))
@ -71,6 +77,25 @@ public class SparkCollectAndSave implements Serializable {
.option("compression", "gzip") .option("compression", "gzip")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.json(outputPath + "/result"); .json(outputPath + "/result");
} else {
write(
Utils
.readPath(spark, inputPath + "/result/publication", Result.class),
outputPath + "/publication");
write(
Utils
.readPath(spark, inputPath + "/result/dataset", Result.class),
outputPath + "/dataset");
write(
Utils
.readPath(spark, inputPath + "/result/otherresearchproduct", Result.class),
outputPath + "/otheresearchproduct");
write(
Utils
.readPath(spark, inputPath + "/result/software", Result.class),
outputPath + "/software");
}
Utils Utils
.readPath(spark, inputPath + "/relation/publication", Relation.class) .readPath(spark, inputPath + "/relation/publication", Relation.class)
@ -86,4 +111,12 @@ public class SparkCollectAndSave implements Serializable {
.json(outputPath + "/relation"); .json(outputPath + "/relation");
} }
private static void write(Dataset<Result> dataSet, String outputPath) {
dataSet
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
} }