[DUMP INDICATORS ] adding a step of mapping to string with object mapper to support decorator in getter and setter to have 'class' as value for a serialized variable

This commit is contained in:
Miriam Baglioni 2022-11-10 09:37:05 +01:00
parent 4b339df43f
commit 5e8cd02acd
2 changed files with 13 additions and 3 deletions

View File

@ -17,6 +17,8 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap;
import eu.dnetlib.dhp.oa.graph.dump.exceptions.CardinalityTooHighException;
import eu.dnetlib.dhp.oa.graph.dump.exceptions.NoAvailableEntityTypeException;
@ -63,10 +65,11 @@ public class DumpProducts implements Serializable {
.readPath(spark, inputPath, inputClazz)
.map((MapFunction<I, O>) value -> execMap(value, communityMap, dumpType), Encoders.bean(outputClazz))
.filter((FilterFunction<O>) value -> value != null)
.map((MapFunction<O, String>) r -> new ObjectMapper().writeValueAsString(r), Encoders.STRING())
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
.text(outputPath);
}

View File

@ -8,16 +8,21 @@ import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.model.graph.GraphResult;
import eu.dnetlib.dhp.oa.model.graph.Relation;
import it.unimi.dsi.fastutil.objects.Object2BooleanMap;
/**
* Reads all the entities of the same type (Relation / Results) and saves them in the same folder
@ -73,10 +78,11 @@ public class SparkCollectAndSave implements Serializable {
.union(Utils.readPath(spark, inputPath + "/result/dataset", GraphResult.class))
.union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", GraphResult.class))
.union(Utils.readPath(spark, inputPath + "/result/software", GraphResult.class))
.map((MapFunction<GraphResult, String>) r -> new ObjectMapper().writeValueAsString(r), Encoders.STRING() )
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath + "/result");
.text(outputPath + "/result");
} else {
write(
Utils
@ -114,9 +120,10 @@ public class SparkCollectAndSave implements Serializable {
private static void write(Dataset<GraphResult> dataSet, String outputPath) {
dataSet
.map((MapFunction<GraphResult, String>) r -> new ObjectMapper().writeValueAsString(r), Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
.text(outputPath);
}
}