dhp-graph-dump/dump/src/main/java/eu/dnetlib/dhp/skgif/DumpResult.java

305 lines
12 KiB
Java

package eu.dnetlib.dhp.skgif;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.skgif.Utils.getOrcid;
import java.io.Serializable;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.skgif.beans.EmitPerManifestation;
import eu.dnetlib.dhp.skgif.beans.PartialResearchProduct;
import eu.dnetlib.dhp.skgif.beans.RelationPerProduct;
import eu.dnetlib.dhp.skgif.model.*;
import eu.dnetlib.dhp.skgif.model.AccessRight;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2;
/**
* @author miriam.baglioni
* @Date 06/02/24
*/
public class DumpResult implements Serializable {
private static final Logger log = LoggerFactory.getLogger(DumpResult.class);
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
PrepareResultRelation.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/dump/dump_result_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("sourcePath");
log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
final String workingDir = parser.get("workingDir");
log.info("workingDir: {}", workingDir);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
Utils.removeOutputDir(spark, outputPath);
mapResult(spark, inputPath, outputPath, workingDir);
});
}
//per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza
public static <R extends Result> void mapResult(SparkSession spark, String inputPath, String outputPath,
String workingDir) {
// selection of the relevant relations from result type to other entity. Only teh semantic relevant ones are
// considered
selectRelations(spark, inputPath, workingDir);
// merge of relations and manifestation for the same result
getRelationAndManifestation(spark, workingDir, inputPath);
// dump of the result and enrichment with relevant information for relations and manifestations
dumpResult(spark, inputPath, workingDir);
}
private static void getRelationAndManifestation(SparkSession spark, String workingDir, String inputPath) {
Dataset<RelationPerProduct> aggRelations = Utils
.readPath(spark, workingDir + "aggrelation", RelationPerProduct.class);
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(ModelSupport::isResult)
.forEach(e -> {
Dataset<Datasource> datasource = Utils
.readPath(spark, inputPath + "/datasource", Datasource.class)
.filter(
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEosctype()).isPresent() &&
d.getEosctype().getClassname().equalsIgnoreCase("Journal archive"));
Dataset<EmitPerManifestation> man = Utils
.readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class);
man
.joinWith(aggRelations, man.col("resultId").equalTo(aggRelations.col("resultId")), "left")
.groupByKey(
(MapFunction<Tuple2<EmitPerManifestation, RelationPerProduct>, String>) t2 -> t2
._1()
.getResultId(),
Encoders.STRING())
.mapGroups(
(MapGroupsFunction<String, Tuple2<EmitPerManifestation, RelationPerProduct>, PartialResearchProduct>) (
k, v) -> {
PartialResearchProduct prp = new PartialResearchProduct();
prp.setResultId(k);
List<EmitPerManifestation> epms = new ArrayList<>();
Tuple2<EmitPerManifestation, RelationPerProduct> first = v.next();
RelationPerProduct rpp = first._2();
epms.add(first._1());
v.forEachRemaining(t2 -> epms.add(t2._1()));
Dataset<EmitPerManifestation> emitformanifestation = spark
.createDataset(epms, Encoders.bean(EmitPerManifestation.class));
prp.setManifestations(getManifestationList(emitformanifestation, datasource));
prp.setRelatedProducts(rpp.getRelatedProduct());
prp.setRelevantOrganizations(rpp.getOrganizations());
prp.setFunding(rpp.getFunding());
return prp;
}, Encoders.bean(PartialResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/partialResearchproduct");
});
}
private static List<Manifestation> getManifestationList(Dataset<EmitPerManifestation> emitformanifestation,
Dataset<Datasource> datasource) {
return emitformanifestation
.joinWith(
datasource, emitformanifestation
.col("hostedBy")
.equalTo(datasource.col("id")),
"left")
.map((MapFunction<Tuple2<EmitPerManifestation, Datasource>, Manifestation>) t2 -> {
// se il lato sinistro c'e' allora ho la biblio e la venue
// se non c'e' allora ho solo gli altri valori
EmitPerManifestation epm = t2._1();
Manifestation manifestation = new Manifestation();
manifestation.setProductLocalTypeSchema(epm.getInstance().getInstancetype().getClassname());
manifestation.setProductLocalTypeSchema(epm.getInstance().getInstancetype().getSchemename());
manifestation
.setDates(
Arrays
.asList(
Dates.newInstance(epm.getInstance().getDateofacceptance().getValue(), "publishing")));
if (Optional.ofNullable(epm.getInstance().getRefereed()).isPresent())
switch (epm.getInstance().getRefereed().getClassid()) {
case "0000":
manifestation.setPeerReview(PeerReview.UNAVAILABLE.label);
break;
case "0001":
manifestation.setPeerReview(PeerReview.PEER_REVIEWED.label);
break;
case "0002":
manifestation.setPeerReview(PeerReview.NON_PEER_REVIEWED.label);
break;
}
manifestation.setMetadataCuration("unavailable");
if (Optional.ofNullable(epm.getInstance().getAccessright()).isPresent())
switch (epm.getInstance().getAccessright().getClassid()) {
case "OPEN":
case "OPEN DATA":
case "OPEN SOURCE":
manifestation.setAccessRight(AccessRight.OPEN.label);
break;
case "CLOSED":
manifestation.setAccessRight(AccessRight.CLOSED.label);
break;
case "RESTRICTED":
manifestation.setAccessRight(AccessRight.RESTRICTED.label);
break;
case "EMBARGO":
case "12MONTHS":
case "6MONTHS":
manifestation.setAccessRight(AccessRight.EMBARGO.label);
break;
default:
manifestation.setAccessRight(AccessRight.UNAVAILABLE.label);
}
manifestation.setLicence(epm.getInstance().getLicense().getValue());
manifestation.setUrl(epm.getInstance().getUrl().get(0));
if (Optional.ofNullable(epm.getInstance().getPid()).isPresent()) {
manifestation.setPid(epm.getInstance().getPid().get(0).getValue());
}
if (Optional.ofNullable(t2._2()).isPresent())
manifestation.setBiblio(getBiblio(epm));
manifestation.setVenue("venue_______::" + DHPUtils.md5(epm.getInstance().getHostedby().getKey()));
manifestation
.setHostingDatasource("datasource__::" + DHPUtils.md5(epm.getInstance().getHostedby().getKey()));
return manifestation;
}, Encoders.bean(Manifestation.class))
.collectAsList();
}
private static Biblio getBiblio(EmitPerManifestation epm) {
Biblio biblio = new Biblio();
biblio.setEdition(epm.getJournal().getEdition());
biblio.setIssue(epm.getJournal().getIss());
biblio.setPublisher(epm.getPublisher());
biblio.setVolume(epm.getJournal().getVol());
biblio.setEndPage(epm.getJournal().getEp());
biblio.setStartPage(epm.getJournal().getSp());
return biblio;
}
private static <R extends Result> void dumpResult(SparkSession spark, String inputPath, String workingDir) {
ModelSupport.entityTypes
.keySet()
.parallelStream()
.filter(ModelSupport::isResult)
.forEach(e -> {
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
Dataset<R> results = Utils.readPath(spark, inputPath + e.name(), resultClazz);
Dataset<PartialResearchProduct> prr = Utils
.readPath(spark, workingDir + e.name() + "/partialresearchproduct", PartialResearchProduct.class);
results
.joinWith(prr, results.col("id").equalTo(prr.col("resultId")), "left")
.map((MapFunction<Tuple2<R, PartialResearchProduct>, ResearchProduct>) t2 -> {
ResearchProduct rp = ResultMapper.map(t2._1());
rp.setRelatedProducts(t2._2().getRelatedProducts());
rp.setFunding(t2._2().getFunding());
rp.setRelevantOrganizations(t2._2().getRelevantOrganizations());
rp.setManifestations(rp.getManifestations());
return rp;
}, Encoders.bean(ResearchProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + e.name() + "/researchproduct");
});
}
private static void selectRelations(SparkSession spark, String inputPath, String workingDir) {
Dataset<Relation> relation = spark
.read()
.json(inputPath + "/relation")
.as(Encoders.bean(Relation.class))
.filter("dataInfo.deletedbyinference != true and dataInfo.invisible != true")
.filter(
"relClass == 'HasAuthorInstitution' or relClass == 'IsProducedBy' or " +
"relClass == 'IsSupplementedBy' or relClass == 'IsDocumentedBy' or relClass == 'IsPartOf' " +
"relClass == 'IsNewVersionOf' or relClass == 'Cites'");
relation
.groupByKey((MapFunction<Relation, String>) r -> r.getSource(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Relation, RelationPerProduct>) (k, v) -> {
RelationPerProduct rpp = new RelationPerProduct();
rpp.setResultId(k);
Map<String, List<String>> remainignRelations = new HashMap<>();
while (v.hasNext()) {
Relation rel = v.next();
String target = rel.getTarget();
String relClass = rel.getRelClass();
switch (rel.getRelClass().toLowerCase()) {
case "hasauthorinstitution":
rpp.getOrganizations().add("organization::" + DHPUtils.md5(target));
break;
case "isproducedby":
rpp.getFunding().add("grant_______::" + DHPUtils.md5(target));
break;
default:
if (!remainignRelations.keySet().contains(relClass))
remainignRelations.put(relClass, new ArrayList<>());
remainignRelations.get(relClass).add("product_____::" + DHPUtils.md5(target));
}
}
for (String key : remainignRelations.keySet())
rpp.getRelatedProduct().add(Relations.newInstance(key, remainignRelations.get(key)));
return rpp;
}, Encoders.bean(RelationPerProduct.class))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(workingDir + "/aggrelation");
}
}