package eu.dnetlib.dhp.oa.graph.dump; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; import eu.dnetlib.dhp.oa.graph.dump.Utils; import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import eu.dnetlib.dhp.schema.oaf.*; /** * It fires the execution of the actual dump for result entities. If the dump is for RC/RI products its checks for each * result its belongingess to at least one RC/RI before "asking" for its mapping. */ public class DumpProducts implements Serializable { public void run(Boolean isSparkSessionManaged, String inputPath, String outputPath, String communityMapPath, Class inputClazz, Class outputClazz, boolean graph) { SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); execDump(spark, inputPath, outputPath, communityMapPath, inputClazz, outputClazz, graph); }); } public static void execDump( SparkSession spark, String inputPath, String outputPath, String communityMapPath, Class inputClazz, Class outputClazz, boolean graph) { CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); Utils .readPath(spark, inputPath, inputClazz) .map((MapFunction) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } private static O execMap(I value, CommunityMap communityMap, boolean graph) { Optional odInfo = Optional.ofNullable(value.getDataInfo()); if (odInfo.isPresent()) { if (odInfo.get().getDeletedbyinference()) { return null; } } else { return null; } if (!graph) { Set communities = communityMap.keySet(); Optional> inputContext = Optional .ofNullable(((eu.dnetlib.dhp.schema.oaf.Result) value).getContext()); if (!inputContext.isPresent()) { return null; } List toDumpFor = inputContext.get().stream().map(c -> { if (communities.contains(c.getId())) { return c.getId(); } if (c.getId().contains("::") && communities.contains(c.getId().substring(0, c.getId().indexOf("::")))) { return c.getId().substring(0, 3); } return null; }).filter(Objects::nonNull).collect(Collectors.toList()); if (toDumpFor.size() == 0) { return null; } } return (O) ResultMapper.map(value, communityMap, graph); } }