diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java new file mode 100644 index 000000000..d5b2ced7c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/BipDeserialize.java @@ -0,0 +1,28 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +/** + * Class that maps the model of the bipFinder! input data. + * Only needed for deserialization purposes + */ + +public class BipDeserialize extends HashMap> implements Serializable { + + public BipDeserialize() { + super(); + } + + public List get(String key) { + + if (super.get(key) == null) { + return new ArrayList<>(); + } + return super.get(key); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java new file mode 100644 index 000000000..0bebe2fb0 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/CollectAndSave.java @@ -0,0 +1,85 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.oaf.Result; + +/** + * Just collects all the atomic actions produced for the different results and saves them in + * outputpath for the ActionSet + */ +public class CollectAndSave implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + CollectAndSave.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("inputPath"); + log.info("inputPath {}: ", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + collectAndSave(spark, inputPath, outputPath); + }); + } + + private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + sc + .sequenceFile(inputPath + "/publication", Text.class, Text.class) + .union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class)) + .union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class)) + .union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class)) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + ; + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java new file mode 100644 index 000000000..6909a9634 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/KeyValue.java @@ -0,0 +1,26 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import java.io.Serializable; + +public class KeyValue implements Serializable { + + private String key; + private String value; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java new file mode 100644 index 000000000..493e94417 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/PreparedResult.java @@ -0,0 +1,28 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import java.io.Serializable; + +/** + * Subset of the information of the generic results that are needed to create the atomic action + */ +public class PreparedResult implements Serializable { + private String id; // openaire id + private String value; // doi + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java new file mode 100644 index 000000000..7cc21b44d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/Score.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.actionmanager.bipfinder; + +import java.io.Serializable; +import java.util.List; + +/** + * represents the score in the input file + */ +public class Score implements Serializable { + + private String id; + private List unit; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public List getUnit() { + return unit; + } + + public void setUnit(List unit) { + this.unit = unit; + } +}