dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/filterentities/SelectLOT1Entities.java

134 lines
5.6 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.filterentities;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.max;
import static org.apache.spark.sql.functions.col;
import java.io.Serializable;
import java.util.Optional;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
/**
* @author miriam.baglioni
* @Date 20/03/24
*/
public class SelectLOT1Entities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(SelectLOT1Entities.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
FilterEntities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/skgif/eosc_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String filterPath = parser.get("filterPath");
log.info("filterPath: {}", filterPath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
selectEntities(spark, inputPath, filterPath);
});
}
private static <R extends Result> void selectEntities(SparkSession spark, String inputPath, String filterPath) {
selectPublications(spark,inputPath,filterPath);
selectDataset(spark,inputPath,filterPath);
selectSoftware(spark,inputPath,filterPath);
selectOthers(spark,inputPath,filterPath);
}
private static void selectOthers(SparkSession spark, String inputPath, String filterPath) {
spark.read().schema(Encoders.bean(OtherResearchProduct.class).schema())
.json(inputPath + "otherresearchproduct")
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true")
.selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'handle')") // filter by pid type
.selectExpr("id", "explode(instance) as instance")
.withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END"))
.groupBy("id")
.agg(max(col("CCL")).as("CCL"))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(filterPath + "otherresearchproduct_ids");
}
private static void selectSoftware(SparkSession spark, String inputPath, String filterPath) {
spark.read().schema(Encoders.bean(Software.class).schema())
.json(inputPath + "software")
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true")
.selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'swhid')") // filter by pid type
.selectExpr("id", "explode(instance) as instance")
.withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END"))
.groupBy("id")
.agg(max(col("CCL")).as("CCL"))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(filterPath + "software_ids");
}
private static void selectDataset(SparkSession spark, String inputPath, String filterPath) {
spark.read().schema(Encoders.bean(Dataset.class).schema())
.json(inputPath + "dataset")
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true")
.selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'handle', 'pdb', 'ena', 'uniprot')") // filter by pid type
.selectExpr("id", "explode(instance) as instance")
.withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END"))
.groupBy("id")
.agg(max(col("CCL")).as("CCL"))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(filterPath + "dataset_ids");
}
private static void selectPublications(SparkSession spark, String inputPath, String filterPath) {
spark.read().schema(Encoders.bean(Publication.class).schema())
.json(inputPath + "publication")
.where("datainfo.deletedbyinference != true AND datainfo.invisible != true")
.selectExpr("id", "instance", "explode(pid) as pid").where("pid.qualifier.classid IN ('doi', 'arXiv', 'pmid', 'handle')") // filter by pid type
.selectExpr("id", "explode(instance) as instance").where("instance.instancetype.classname IN('Book', 'Article', 'Journal', 'Data Paper', 'Software Paper', 'Preprint', 'Part of book or chapter of book', 'Thesis', 'Master thesis', 'Bachelor thesis', 'Doctoral thesis', 'Conference object', 'Research', 'Other literature type')")
.withColumn("CCL", expr("CASE WHEN instance.license.value LIKE 'CC%' OR instance.license.value LIKE '%/creativecommons.org/%' THEN 1 ELSE 0 END"))
.groupBy("id")
.agg(max(col("CCL")).as("CCL"))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.parquet(filterPath + "publication_ids");
}
}