dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java

250 lines
8.5 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.skgif;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.skgif.model.*;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 06/02/24
*/
public class EmitFromEntities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
EmitFromEntities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
emitFromResult(spark, inputPath, outputPath, workingDir);
});
}
public static <R extends Result> void emitFromResult(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
emitPerson(spark, inputPath, outputPath, workingDir);
emitTopic(spark, inputPath, outputPath, workingDir);
emitDatasourcePublisher(spark, inputPath, workingDir);
}
private static void emitDatasourcePublisher(SparkSession spark, String inputPath, String workingDir) {
Dataset<Row> journalIds = spark
.read()
.schema(Encoders.bean(Datasource.class).schema())
.json((inputPath + "datasource"))
.filter(
"datainfo.deletedbyinference !=true and " +
"eoscdatasourcetype.classid == 'Journal archive' ")
.select("id");
Dataset<Publication> result = spark
.read()
.schema(Encoders.bean(Publication.class).schema())
.json(inputPath + "publication")
.filter("datainfo.deletedbyinference != true ")
.as(Encoders.bean(Publication.class));
Dataset<Row> datasourcePublisher = result.flatMap((FlatMapFunction<Publication, Tuple2<String, String>>) r -> {
ArrayList<Tuple2<String, String>> dsPub = new ArrayList<>();
if (Optional.ofNullable(r.getJournal()).isPresent() &&
Optional.ofNullable(r.getPublisher()).isPresent()) {
for (Instance i : r.getInstance())
dsPub.add(new Tuple2<>(i.getHostedby().getKey(), r.getPublisher().getValue()));
}
return dsPub.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.selectExpr("_1 as hostedby", "_2 as publisher");
datasourcePublisher
.join(journalIds, datasourcePublisher.col("hostedby").equalTo(journalIds.col("id")), "leftsemi")
.distinct()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/datasourcePublisher");
}
private static <R extends Result> void emitTopic(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.filter((FilterFunction<R>) r -> Optional.ofNullable(r.getSubject()).isPresent())
.flatMap(
(FlatMapFunction<R, Topic>) r -> r
.getSubject()
.stream()
.filter(
s -> s.getQualifier().getClassid().equalsIgnoreCase("fos")
|| s.getQualifier().getClassid().equalsIgnoreCase("sdg"))
.map(s -> {
Topic t = new Topic();
t
.setLocal_identifier(
Utils
.getIdentifier(
Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue()));
t
.setIdentifiers(
Arrays
.asList(
Identifier.newInstance(s.getQualifier().getClassid(), s.getValue())));
t.setName(s.getValue());
return t;
})
.collect(Collectors.toList())
.iterator(),
Encoders.bean(Topic.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/topic");
}
});
Dataset<Topic> topics = spark.emptyDataset(Encoders.bean(Topic.class));
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(entityType))
topics = topics.union(Utils.readPath(spark, workingDir + entityType.name() + "/topic", Topic.class));
}
topics
.groupByKey((MapFunction<Topic, String>) p -> p.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Topic, Topic>) (k, v) -> v.next(), Encoders.bean(Topic.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/topics");
}
private static <R extends Result> void emitPerson(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.flatMap((FlatMapFunction<R, Persons>) r -> {
List<Persons> authors = new ArrayList<>();
if (Optional.ofNullable(r.getAuthor()).isPresent() && r.getAuthor().size() > 0) {
int count = 0;
for (Author a : r.getAuthor()) {
count += 1;
Persons p = new Persons();
p.setFamily_name(a.getSurname());
p.setGiven_name(a.getName());
p.setFullname(a.getFullname());
String identifier = new String();
if (Optional.ofNullable(a.getPid()).isPresent()) {
Tuple2<String, Boolean> orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils
.getOrcid(a.getPid());
if (orcid != null) {
identifier = Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2());
if (orcid._2())
p
.setIdentifiers(
Arrays.asList(Identifier.newInstance("orcid", orcid._1())));
else
p
.setIdentifiers(
Arrays
.asList(Identifier.newInstance("inferred_orcid", orcid._1())));
} else {
if (Optional.ofNullable(a.getRank()).isPresent()) {
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + a.getRank());
} else {
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + count);
}
}
}
p.setLocal_identifier(identifier);
authors.add(p);
}
}
return authors.iterator();
}, Encoders.bean(Persons.class))
.filter((FilterFunction<Persons>) p -> p != null)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/person");
}
});
Dataset<Persons> persons = spark.emptyDataset(Encoders.bean(Persons.class));
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(entityType))
persons = persons
.union(Utils.readPath(spark, workingDir + entityType.name() + "/person", Persons.class));
}
persons
.groupByKey((MapFunction<Persons, String>) p -> p.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Persons, Persons>) (k, v) -> v.next(), Encoders.bean(Persons.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/persons");
}
}