diff --git a/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscRelationsStep2.java b/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscRelationsStep2.java new file mode 100644 index 0000000..27dcaa0 --- /dev/null +++ b/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscRelationsStep2.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import javax.rmi.CORBA.Util; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.SparkPrepareResultProject; +import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +/** + * @author miriam.baglioni + * @Date 27/07/22 + */ +public class SelectEoscRelationsStep2 implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectEoscRelationsStep2.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SelectEoscRelationsStep2.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/reletion_selection_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String resultPath = parser.get("resultPath"); + log.info("resultPath: {}", resultPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, resultPath + "/relation"); + selectRelations(spark, inputPath, resultPath + "/relation", resultPath); + }); + } + + private static void selectRelations(SparkSession spark, String inputPath, String outputPath, String resultPath) { + Dataset results = Utils + .readPath(spark, resultPath + "/publication", GraphResult.class) + .union( + Utils + .readPath(spark, resultPath + "/dataset", GraphResult.class)) + .union( + Utils + .readPath(spark, resultPath + "/software", GraphResult.class)) + .union( + Utils + .readPath(spark, resultPath + "/otherresearchproduct", GraphResult.class)); + + Dataset relations = Utils + .readPath(spark, inputPath + "/relation", Relation.class) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + !r.getDataInfo().getInvisible()); + +// relations.joinWith(results, relations.col("source").equalTo(results.col("id")), "left") +// .map((MapFunction, eu.dnetlib.dhp.schema.dump.oaf.Relation) ); + } + +} diff --git a/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java b/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java new file mode 100644 index 0000000..b62bb54 --- /dev/null +++ b/src/main/java/eu/dnetlib/dhp/oa/graph/dump/eosc/SelectEoscResultsJobStep1.java @@ -0,0 +1,88 @@ + +package eu.dnetlib.dhp.oa.graph.dump.eosc; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.ResultMapper; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; +import eu.dnetlib.dhp.schema.oaf.Result; + +/** + * @author miriam.baglioni + * @Date 27/07/22 + */ +public class SelectEoscResultsJobStep1 implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SelectEoscResultsJobStep1.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SelectEoscResultsJobStep1.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/eosc_select_result_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + selectEoscResults(spark, inputPath, outputPath, inputClazz); + }); + } + + private static void selectEoscResults(SparkSession spark, String inputPath, String outputPath, + Class inputClazz) { + Utils + .readPath(spark, inputPath, inputClazz) + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && !r.getDataInfo().getInvisible() + && r.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))) + .map( + (MapFunction) r -> (GraphResult) ResultMapper + .map(r, null, Constants.DUMPTYPE.COMPLETE.getType()), + Encoders.bean(GraphResult.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + +}