dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/oa/graph/dump/skgif/EmitFromEntities.java

523 lines
18 KiB
Java

package eu.dnetlib.dhp.oa.graph.dump.skgif;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.skgif.model.AccessRight;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 06/02/24
*/
public class EmitFromEntities implements Serializable {
private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
EmitFromEntities.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/skgif/emit_biblio_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
emitFromResult(spark, inputPath, outputPath, workingDir);
// emitFromDatasource(spark, inputPath, workingDir);
// emitFromOrganization(spark, inputPath, workingDir);
// emitFromProject(spark, inputPath, workingDir);
});
}
private static void emitFromProject(SparkSession spark, String inputPath, String workingDir) {
Utils
.readPath(spark, inputPath + "project", Project.class)
.filter((FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference())
.map((MapFunction<Project, MinGrant>) p -> {
// EncloseMinElement eme = new EncloseMinElement();
// eme.setEnclosedEntityId(p.getId());
// eme.setMinGrant(Utils.getMinGrant(p));
// return eme;
return Utils.getMinGrant(p);
}, Encoders.bean(MinGrant.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/minGrant");
}
private static void emitFromOrganization(SparkSession spark, String inputPath, String workingDir) {
Utils
.readPath(spark, inputPath + "organization", Organization.class)
.filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference())
.map(
(MapFunction<Organization, MinOrganization>) o -> {
// EncloseMinElement eme = new EncloseMinElement();
return Utils.getMinOrganization(o);
// eme.setMinOrganization(Utils.getMinOrganization(o));
// eme.setEnclosedEntityId(o.getId());
// return eme;
},
Encoders.bean(MinOrganization.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/minOrganization");
}
private static void emitFromDatasource(SparkSession spark, String inputPath, String workingDir) {
// StructType daSchema = StructType
// .fromDDL("`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>, `officialname` STRUCT<`value` STRING>");
// spark
// .read()
// .schema(Encoders.bean(Datasource.class).schema())
// .json(inputPath + "datasource")
// .filter(
// "dataInfo.deletedbyinference != true " )
// .selectExpr("id as enclosedEntity", "<id as local_identifier> as minDatasource" )
// .select(new Column("id").as("minDatasource.local_identifier"), new Column("officialname.value").as("minDatasource.name"), new Column("id").as("enclosedEntityId"))
// .write()
// .mode(SaveMode.Append)
// .option("compression","gzip")
// .json(workingDir + "/minEntity");
Utils
.readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference())
.map((MapFunction<Datasource, MinVenue>) d -> {
// EncloseMinElement eme = new EncloseMinElement();
// eme
// .setMinDatsource(
return MinVenue
.newInstance(
// Utils.getIdentifier(Prefixes.DATASOURCE, d.getId()),
// d.getOfficialname().getValue()));
d.getId(), d.getOfficialname().getValue());// );
// eme.setEnclosedEntityId(d.getId());
// return eme;
}, Encoders.bean(MinVenue.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/minDatasource");
Utils
.readPath(spark, inputPath + "datasource", Datasource.class)
.filter((FilterFunction<Datasource>) d -> !d.getDataInfo().getDeletedbyinference())
.filter(
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() && d
.getEoscdatasourcetype()
.getClassid()
.equalsIgnoreCase("Journal archive"))
.map((MapFunction<Datasource, MinVenue>) d -> {
// EncloseMinElement eme = new EncloseMinElement();
// eme.setEnclosedEntityId(d.getId());
if (Optional.ofNullable(d.getJournal()).isPresent() &&
Optional.ofNullable(d.getJournal().getIssnPrinted()).isPresent()) {
// eme
// .setMinVenue(
return MinVenue
.newInstance(
Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnPrinted()),
d.getOfficialname().getValue());// );
// return eme;
}
if (Optional.ofNullable(d.getJournal()).isPresent() &&
Optional.ofNullable(d.getJournal().getIssnOnline()).isPresent()) {
// eme
// .setMinVenue(
return MinVenue
.newInstance(
Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnOnline()),
d.getOfficialname().getValue());// );
// return eme;
}
return null;
}, Encoders.bean(MinVenue.class))
.filter(Objects::nonNull)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/minVenue");
}
// per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza
public static <R extends Result> void emitFromResult(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
// emitManifestation(spark, inputPath, workingDir);
emitPerson(spark, inputPath, outputPath, workingDir);
emitTopic(spark, inputPath, outputPath, workingDir);
emitDatasourcePublisher(spark, inputPath, workingDir);
// emitMinProduct(spark, inputPath, workingDir);
}
//the publisher is at the level of the result as well as the information for the journal. We do not know which instance
// hostedby.key is the one for the journal
private static void emitDatasourcePublisher(SparkSession spark, String inputPath, String workingDir) {
Dataset<Row> journalIds = spark
.read()
.schema(Encoders.bean(Datasource.class).schema())
.json((inputPath + "datasource"))
.filter(
"datainfo.deletedbyinference !=true false and " +
"eoscdatasourcetype.classid == 'Journal archive' ")
.select("id");
Dataset<Publication> result = spark
.read()
.schema(Encoders.bean(Publication.class).schema())
.json(inputPath + "publication")
.filter("datainfo.deletedbyinference != true ")
.as(Encoders.bean(Publication.class));
Dataset<Row> datasourcePublisher = result.flatMap((FlatMapFunction<Publication, Tuple2<String, String>>) r -> {
ArrayList<Tuple2<String, String>> dsPub = new ArrayList<>();
if (Optional.ofNullable(r.getJournal()).isPresent() &&
Optional.ofNullable(r.getPublisher()).isPresent()) {
for (Instance i : r.getInstance())
dsPub.add(new Tuple2<>(i.getHostedby().getKey(), r.getPublisher().getValue()));
}
return dsPub.iterator();
}, Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.selectExpr("_1 as hostedby", "_2 as publisher");
datasourcePublisher
.join(journalIds, datasourcePublisher.col("hostedby").equalTo(journalIds.col("id")), "leftsemi")
.distinct()
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/datasourcePublisher");
}
private static <R extends Result> void emitMinProduct(SparkSession spark, String inputPath, String workingDir) {
Utils.removeOutputDir(spark, workingDir + "minProduct");
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.map(
(MapFunction<R, MinProduct>) p -> Utils.getMinProduct(p),
Encoders.bean(MinProduct.class))
.write()
.mode(SaveMode.Append)
.option("compression", "gzip")
.json(workingDir + "/minProduct");
}
});
}
private static <R extends Result> void emitTopic(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.filter((FilterFunction<R>) r -> Optional.ofNullable(r.getSubject()).isPresent())
.flatMap(
(FlatMapFunction<R, Topic>) r -> r
.getSubject()
.stream()
.filter(
s -> s.getQualifier().getClassid().equalsIgnoreCase("fos")
// || s.getQualifier().getClassid().equalsIgnoreCase("sdg"))
)
.map(s -> {
Topic t = new Topic();
t
.setLocal_identifier(
Utils
.getIdentifier(
Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue()));
t
.setIdentifiers(
Arrays
.asList(
Identifier.newInstance(s.getQualifier().getClassid(), s.getValue())));
t.setName(s.getValue());
return t;
})
.collect(Collectors.toList())
.iterator(),
Encoders.bean(Topic.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/topic");
}
});
Dataset<Topic> topics = spark.emptyDataset(Encoders.bean(Topic.class));
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(entityType))
topics = topics.union(Utils.readPath(spark, workingDir + entityType.name() + "/topic", Topic.class));
}
topics
.groupByKey((MapFunction<Topic, String>) p -> p.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Topic, Topic>) (k, v) -> v.next(), Encoders.bean(Topic.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/Topic");
}
private static <R extends Result> void emitPerson(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.flatMap((FlatMapFunction<R, Persons>) r -> {
List<Persons> authors = new ArrayList<>();
if (Optional.ofNullable(r.getAuthor()).isPresent() && r.getAuthor().size() > 0) {
int count = 0;
for (Author a : r.getAuthor()) {
count += 1;
Persons p = new Persons();
p.setFamily_name(a.getSurname());
p.setGiven_name(a.getName());
p.setFullname(a.getFullname());
String identifier = new String();
if (Optional.ofNullable(a.getPid()).isPresent()) {
Tuple2<String, Boolean> orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils
.getOrcid(a.getPid());
if (orcid != null) {
identifier = Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2());
if (orcid._2())
p
.setIdentifiers(
Arrays.asList(Identifier.newInstance("orcid", orcid._1())));
else
p
.setIdentifiers(
Arrays
.asList(Identifier.newInstance("inferred_orcid", orcid._1())));
} else {
if (Optional.ofNullable(a.getRank()).isPresent()) {
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + a.getRank());
} else {
identifier = Utils
.getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + count);
}
}
}
p.setLocal_identifier(identifier);
authors.add(p);
}
}
return authors.iterator();
}, Encoders.bean(Persons.class))
.filter((FilterFunction<Persons>) p -> p != null)
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/person");
}
});
Dataset<Persons> persons = spark.emptyDataset(Encoders.bean(Persons.class));
for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
if (ModelSupport.isResult(entityType))
persons = persons
.union(Utils.readPath(spark, workingDir + entityType.name() + "/person", Persons.class));
}
persons
.groupByKey((MapFunction<Persons, String>) p -> p.getLocal_identifier(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Persons, Persons>) (k, v) -> v.next(), Encoders.bean(Persons.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath + "/Persons");
}
private static <R extends Result> void emitManifestation(SparkSession spark, String inputPath, String workingDir) {
ModelSupport.entityTypes.keySet().forEach(e -> {
if (ModelSupport.isResult(e)) {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Utils
.readPath(spark, inputPath + e.name(), resultClazz)
.flatMap((FlatMapFunction<R, EmitPerManifestation>) p -> p.getInstance().stream().map(i -> {
EmitPerManifestation epb = new EmitPerManifestation();
epb.setResultId(p.getId());
setInstanceFields(epb, i);
// epb.setInstance(i);
epb.setHostedBy(i.getHostedby().getKey());
epb.setHostedbyvalue(i.getHostedby().getValue());
epb
.setPublisher(
Optional
.ofNullable(p.getPublisher())
.map(v -> v.getValue())
.orElse(new String()));
if (p.getClass() == Publication.class) {
epb.setJournal(((Publication) p).getJournal());
}
return epb;
}).collect(Collectors.toList()).iterator(), Encoders.bean(EmitPerManifestation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/manifestation");
}
});
Dataset<EmitPerManifestation> emitPerManifestationDataset = Utils
.readPath(
spark, workingDir + "software/manifestation", EmitPerManifestation.class)
.union(
Utils
.readPath(
spark, workingDir + "dataset/manifestation", EmitPerManifestation.class))
.union(
Utils
.readPath(
spark, workingDir + "publication/manifestation", EmitPerManifestation.class))
.union(
Utils
.readPath(
spark, workingDir + "otherresearchproduct/manifestation", EmitPerManifestation.class));
emitPerManifestationDataset
.groupByKey((MapFunction<EmitPerManifestation, String>) p -> p.getHostedBy(), Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, EmitPerManifestation, EmitPerManifestation>) (k, v) -> v.next(),
Encoders.bean(EmitPerManifestation.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/datasourcePublisher");
}
private static void setInstanceFields(EmitPerManifestation epb, Instance i) {
epb.setProduct_local_type(i.getInstancetype().getClassname());
epb.setProduct_local_type_schema(i.getInstancetype().getSchemename());
epb.setPeer_reviewed(getPeerReviewd(i));
epb.setAccess_right(getAccessRigth(i));
epb
.setLicence(
Optional
.ofNullable(i.getLicense())
.map(value -> value.getValue())
.orElse(null));
if (Optional.ofNullable(i.getUrl()).isPresent() && i.getUrl().size() > 0)
epb.setUrl(i.getUrl().get(0));
else
epb.setUrl(null);
if (Optional.ofNullable(i.getPid()).isPresent() && i.getPid().size() > 0) {
epb.setPid(i.getPid().get(0).getValue());
}
if (Optional.ofNullable(i.getDateofacceptance()).isPresent())
epb.setPublishing_date(i.getDateofacceptance().getValue());
}
private static String getAccessRigth(Instance i) {
if (Optional.ofNullable(i.getAccessright()).isPresent())
switch (i.getAccessright().getClassid()) {
case "OPEN":
case "OPEN DATA":
case "OPEN SOURCE":
return AccessRight.OPEN.label;
case "CLOSED":
return AccessRight.CLOSED.label;
case "RESTRICTED":
return AccessRight.RESTRICTED.label;
case "EMBARGO":
case "12MONTHS":
case "6MONTHS":
return AccessRight.EMBARGO.label;
default:
return AccessRight.UNAVAILABLE.label;
}
return AccessRight.UNAVAILABLE.label;
}
private static String getPeerReviewd(Instance i) {
if (Optional.ofNullable(i.getRefereed()).isPresent())
switch (i.getRefereed().getClassid()) {
case "0000":
return PeerReview.UNAVAILABLE.label;
case "0001":
return PeerReview.PEER_REVIEWED.label;
case "0002":
return PeerReview.NON_PEER_REVIEWED.label;
}
return PeerReview.UNAVAILABLE.label;
}
}