407 lines
16 KiB
Java
407 lines
16 KiB
Java
|
|
package eu.dnetlib.dhp.oa.graph.dump.skgif;
|
|
|
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
|
|
import java.io.Serializable;
|
|
import java.util.*;
|
|
import java.util.stream.Collectors;
|
|
|
|
import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.*;
|
|
import org.apache.commons.io.IOUtils;
|
|
import org.apache.spark.SparkConf;
|
|
import org.apache.spark.api.java.function.FilterFunction;
|
|
import org.apache.spark.api.java.function.MapFunction;
|
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
|
import org.apache.spark.sql.*;
|
|
import org.apache.spark.sql.Dataset;
|
|
import org.jetbrains.annotations.NotNull;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
import eu.dnetlib.dhp.schema.common.EntityType;
|
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
import eu.dnetlib.dhp.schema.oaf.*;
|
|
import eu.dnetlib.dhp.schema.oaf.Datasource;
|
|
import eu.dnetlib.dhp.schema.oaf.Organization;
|
|
import eu.dnetlib.dhp.skgif.model.*;
|
|
import eu.dnetlib.dhp.skgif.model.AccessRight;
|
|
import eu.dnetlib.dhp.utils.DHPUtils;
|
|
import scala.Tuple2;
|
|
|
|
/**
|
|
* @author miriam.baglioni
|
|
* @Date 06/02/24
|
|
*/
|
|
public class DumpResult implements Serializable {
|
|
|
|
private static final Logger log = LoggerFactory.getLogger(DumpResult.class);
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
String jsonConfiguration = IOUtils
|
|
.toString(
|
|
DumpResult.class
|
|
.getResourceAsStream(
|
|
"/eu/dnetlib/dhp/oa/graph/dump/dump_result_parameters.json"));
|
|
|
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
parser.parseArgument(args);
|
|
|
|
Boolean isSparkSessionManaged = Optional
|
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
.map(Boolean::valueOf)
|
|
.orElse(Boolean.TRUE);
|
|
|
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
|
|
final String inputPath = parser.get("sourcePath");
|
|
log.info("inputPath: {}", inputPath);
|
|
|
|
final String workingDir = parser.get("workingDir");
|
|
log.info("workingDir: {}", workingDir);
|
|
|
|
final String outputPath = parser.get("outputPath");
|
|
log.info("outputPath: {}", outputPath);
|
|
|
|
SparkConf conf = new SparkConf();
|
|
|
|
runWithSparkSession(
|
|
conf,
|
|
isSparkSessionManaged,
|
|
spark -> {
|
|
Utils.removeOutputDir(spark, workingDir + "aggrelation");
|
|
|
|
mapResult(spark, inputPath, workingDir, outputPath);
|
|
});
|
|
}
|
|
|
|
//per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza
|
|
public static <R extends Result> void mapResult(SparkSession spark, String inputPath,
|
|
String workingDir, String outputPath) {
|
|
|
|
//emit the snippet of the entities to be included in other entities for the dematerialization
|
|
// emitMinEntities(spark, inputPath, workingDir);
|
|
|
|
// selection of the relevant relations from result type to other entity. Only teh semantic relevant ones are
|
|
// considered
|
|
selectRelations(spark, inputPath, workingDir);
|
|
|
|
// merge of relations and manifestation for the same result
|
|
getRelationAndManifestation(spark, workingDir, inputPath);
|
|
|
|
// dump of the result and enrichment with relevant information for relations and manifestations
|
|
dumpResult(spark, inputPath, workingDir, outputPath);
|
|
|
|
}
|
|
|
|
// private static void emitMinEntities(SparkSession spark, String inputPath, String workingDir) {
|
|
//
|
|
// Utils.readPath(spark, inputPath + "organization", Organization.class)
|
|
// .filter((FilterFunction<Organization>) o -> !o.getDataInfo().getDeletedbyinference())
|
|
// .map((MapFunction<Organization, EncloseMinElement>) o -> {
|
|
// EncloseMinElement eme = new EncloseMinElement();
|
|
// eme.setEnclosedEntityId(o.getId());
|
|
// eme.setMinOrganization(Utils.getMinOrganization(o));
|
|
// return eme;
|
|
// }, Encoders.bean(EncloseMinElement.class) )
|
|
// .write()
|
|
// .mode(SaveMode.Overwrite)
|
|
// .option("compression","gzip")
|
|
// .json(workingDir + "encloseMinEntity");
|
|
//
|
|
// Utils.readPath(spark, inputPath + "project", Project.class)
|
|
// .filter((FilterFunction<Project>) p -> !p.getDataInfo().getDeletedbyinference())
|
|
// .map((MapFunction<Project,EncloseMinElement>) p -> {
|
|
// EncloseMinElement eme = new EncloseMinElement();
|
|
// eme.setEnclosedEntityId(p.getId());
|
|
// eme.setMinGrant(Utils.getMinGrant(p));
|
|
// return eme;
|
|
// }, Encoders.bean(EncloseMinElement.class))
|
|
// .write()
|
|
// .mode(SaveMode.Append)
|
|
// .option("compression","gzip")
|
|
// .json(workingDir + "encloseMinEntity");
|
|
//
|
|
// getMinProduct(spark, inputPath + "publication" , Publication.class)
|
|
// .union(getMinProduct(spark, inputPath + "dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class))
|
|
// .union(getMinProduct(spark, inputPath + "software", Software.class))
|
|
// .union(getMinProduct(spark, inputPath + "otherresearchproduct", OtherResearchProduct.class))
|
|
// .write()
|
|
// .mode(SaveMode.Append)
|
|
// .option("compression","gzip")
|
|
// .json(workingDir + "encloseMinEntity");
|
|
//
|
|
//
|
|
// }
|
|
|
|
private static void getRelationAndManifestation(SparkSession spark, String workingDir, String inputPath) {
|
|
Dataset<RelationPerProduct> aggRelations = Utils
|
|
.readPath(spark, workingDir + "aggrelation", RelationPerProduct.class);
|
|
|
|
ModelSupport.entityTypes
|
|
.keySet()
|
|
.stream()
|
|
.filter(ModelSupport::isResult)
|
|
.forEach(e -> {
|
|
Utils.removeOutputDir(spark, workingDir + e.name() + "/partialresearchproduct");
|
|
|
|
Dataset<Datasource> datasource = Utils
|
|
.readPath(spark, inputPath + "/datasource", Datasource.class)
|
|
.filter(
|
|
(FilterFunction<Datasource>) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() &&
|
|
d.getEoscdatasourcetype().getClassid().equalsIgnoreCase("Journal archive"));
|
|
|
|
Dataset<EmitPerManifestation> man = Utils
|
|
.readPath(spark, workingDir + e.name() + "/manifestation", EmitPerManifestation.class);
|
|
|
|
Dataset<PartialResearchProduct> partialResearchProduct = man
|
|
.joinWith(datasource, man.col("instance.hostedby.key").equalTo(datasource.col("id")), "left")
|
|
.groupByKey(
|
|
(MapFunction<Tuple2<EmitPerManifestation, Datasource>, String>) t2 -> t2._1().getResultId(),
|
|
Encoders.STRING())
|
|
.mapGroups(
|
|
(MapGroupsFunction<String, Tuple2<EmitPerManifestation, Datasource>, PartialResearchProduct>) (
|
|
k, v) -> {
|
|
PartialResearchProduct prp = new PartialResearchProduct();
|
|
prp.setResultId(k);
|
|
List<Manifestation> manifestationList = new ArrayList<>();
|
|
while (v.hasNext())
|
|
manifestationList.add(getManifestation(v.next()));
|
|
prp.setManifestations(manifestationList);
|
|
return prp;
|
|
}, Encoders.bean(PartialResearchProduct.class));
|
|
partialResearchProduct
|
|
.joinWith(
|
|
aggRelations, partialResearchProduct.col("resultId").equalTo(aggRelations.col("resultId")),
|
|
"left")
|
|
.map(
|
|
(MapFunction<Tuple2<PartialResearchProduct, RelationPerProduct>, PartialResearchProduct>) t2 -> {
|
|
PartialResearchProduct prp = t2._1();
|
|
if (Optional.ofNullable(t2._2()).isPresent()) {
|
|
prp.setRelated_products(t2._2().getRelatedProduct().keySet()
|
|
.stream().map(key -> Relations.newInstance(key, t2._2().getRelatedProduct().get(key))).collect(Collectors.toList()));
|
|
prp.setRelevant_organizations(t2._2().getOrganizations());
|
|
prp.setFunding(t2._2().getFunding());
|
|
}
|
|
return prp;
|
|
}, Encoders.bean(PartialResearchProduct.class))
|
|
.write()
|
|
.mode(SaveMode.Overwrite)
|
|
.option("compression", "gzip")
|
|
.json(workingDir + e.name() + "/partialresearchproduct");
|
|
|
|
});
|
|
}
|
|
|
|
private static Manifestation getManifestation(Tuple2<EmitPerManifestation, Datasource> t2) {
|
|
|
|
// se il lato sinistro c'e' allora ho la biblio e la venue
|
|
// se non c'e' allora ho solo gli altri valori
|
|
EmitPerManifestation epm = t2._1();
|
|
Manifestation manifestation = new Manifestation();
|
|
manifestation.setProduct_local_type(epm.getInstance().getInstancetype().getClassname());
|
|
manifestation.setProduct_local_type_schema(epm.getInstance().getInstancetype().getSchemename());
|
|
if (Optional.ofNullable(epm.getInstance().getDateofacceptance()).isPresent())
|
|
manifestation
|
|
.setDates(
|
|
Arrays
|
|
.asList(
|
|
Dates.newInstance(epm.getInstance().getDateofacceptance().getValue(), "publishing")));
|
|
if (Optional.ofNullable(epm.getInstance().getRefereed()).isPresent())
|
|
switch (epm.getInstance().getRefereed().getClassid()) {
|
|
case "0000":
|
|
manifestation.setPeer_review(PeerReview.UNAVAILABLE.label);
|
|
break;
|
|
case "0001":
|
|
manifestation.setPeer_review(PeerReview.PEER_REVIEWED.label);
|
|
break;
|
|
case "0002":
|
|
manifestation.setPeer_review(PeerReview.NON_PEER_REVIEWED.label);
|
|
break;
|
|
}
|
|
|
|
manifestation.setMetadata_curation("unavailable");
|
|
if (Optional.ofNullable(epm.getInstance().getAccessright()).isPresent())
|
|
switch (epm.getInstance().getAccessright().getClassid()) {
|
|
case "OPEN":
|
|
case "OPEN DATA":
|
|
case "OPEN SOURCE":
|
|
manifestation.setAccess_right(AccessRight.OPEN.label);
|
|
break;
|
|
case "CLOSED":
|
|
manifestation.setAccess_right(AccessRight.CLOSED.label);
|
|
break;
|
|
case "RESTRICTED":
|
|
manifestation.setAccess_right(AccessRight.RESTRICTED.label);
|
|
break;
|
|
case "EMBARGO":
|
|
case "12MONTHS":
|
|
case "6MONTHS":
|
|
manifestation.setAccess_right(AccessRight.EMBARGO.label);
|
|
break;
|
|
default:
|
|
manifestation.setAccess_right(AccessRight.UNAVAILABLE.label);
|
|
|
|
}
|
|
|
|
manifestation
|
|
.setLicence(
|
|
Optional
|
|
.ofNullable(epm.getInstance().getLicense())
|
|
.map(value -> value.getValue())
|
|
.orElse(null));
|
|
if(Optional.ofNullable(epm.getInstance().getUrl()).isPresent() && epm.getInstance().getUrl().size() > 0)
|
|
manifestation
|
|
.setUrl(epm.getInstance().getUrl().get(0));
|
|
else
|
|
manifestation.setUrl(null);
|
|
|
|
if (Optional.ofNullable(epm.getInstance().getPid()).isPresent() && epm.getInstance().getPid().size() > 0) {
|
|
manifestation.setPid(epm.getInstance().getPid().get(0).getValue());
|
|
}
|
|
if (Optional.ofNullable(t2._2()).isPresent()) {
|
|
manifestation.setBiblio(getBiblio(epm));
|
|
if (Optional.ofNullable(t2._2().getJournal().getIssnPrinted()).isPresent())
|
|
manifestation.setVenue(MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnPrinted()),t2._1().getJournal().getName()));
|
|
else if (Optional.ofNullable(t2._2().getJournal().getIssnOnline()).isPresent())
|
|
manifestation.setVenue(MinVenue.newInstance(Utils.getIdentifier(Prefixes.VENUE, t2._1().getJournal().getIssnOnline()),t2._1().getJournal().getName()));
|
|
}
|
|
manifestation
|
|
.setHosting_datasource(MinVenue.newInstance(Utils.getIdentifier(Prefixes.DATASOURCE, epm.getInstance().getHostedby().getKey()), epm.getInstance().getHostedby().getValue()));
|
|
|
|
return manifestation;
|
|
}
|
|
|
|
private static Biblio getBiblio(EmitPerManifestation epm) {
|
|
Biblio biblio = new Biblio();
|
|
biblio.setEdition(epm.getJournal().getEdition());
|
|
biblio.setIssue(epm.getJournal().getIss());
|
|
biblio.setPublisher(epm.getPublisher());
|
|
biblio.setVolume(epm.getJournal().getVol());
|
|
biblio.setEnd_page(epm.getJournal().getEp());
|
|
biblio.setStart_page(epm.getJournal().getSp());
|
|
return biblio;
|
|
}
|
|
|
|
private static <R extends Result> void dumpResult(SparkSession spark, String inputPath, String workingDir,
|
|
String outputPath) {
|
|
ModelSupport.entityTypes
|
|
.keySet()
|
|
.parallelStream()
|
|
.filter(ModelSupport::isResult)
|
|
.forEach(e -> {
|
|
Class<R> resultClazz = ModelSupport.entityTypes.get(e);
|
|
Utils.removeOutputDir(spark, workingDir + e.name() + "/researchproduct");
|
|
Dataset<R> results = Utils.readPath(spark, inputPath + e.name(), resultClazz);
|
|
Dataset<PartialResearchProduct> prr = Utils
|
|
.readPath(spark, workingDir + e.name() + "/partialresearchproduct", PartialResearchProduct.class);
|
|
|
|
results
|
|
.joinWith(prr, results.col("id").equalTo(prr.col("resultId")), "left")
|
|
.map((MapFunction<Tuple2<R, PartialResearchProduct>, ResearchProduct>) t2 -> {
|
|
ResearchProduct rp = ResultMapper.map(t2._1());
|
|
if (Optional.ofNullable(t2._2()).isPresent()) {
|
|
if (Optional.ofNullable(t2._2().getRelated_products()).isPresent())
|
|
rp.setRelated_products(t2._2().getRelated_products());
|
|
if (Optional.ofNullable(t2._2().getFunding()).isPresent())
|
|
rp.setFunding(t2._2().getFunding());
|
|
if (Optional.ofNullable(t2._2().getRelevant_organizations()).isPresent())
|
|
rp.setRelevant_organizations(t2._2().getRelevant_organizations());
|
|
if (Optional.ofNullable(t2._2().getManifestations()).isPresent())
|
|
rp.setManifestations(t2._2().getManifestations());
|
|
}
|
|
return rp;
|
|
}, Encoders.bean(ResearchProduct.class))
|
|
.write()
|
|
.mode(SaveMode.Overwrite)
|
|
.option("compression", "gzip")
|
|
.json(workingDir + e.name() + "/researchproduct");
|
|
|
|
});
|
|
Dataset<ResearchProduct> researchProducts = spark.emptyDataset(Encoders.bean(ResearchProduct.class));
|
|
for (EntityType e : ModelSupport.entityTypes.keySet()) {
|
|
if (ModelSupport.isResult(e))
|
|
researchProducts = researchProducts
|
|
.union(Utils.readPath(spark, workingDir + e.name() + "/researchproduct", ResearchProduct.class));
|
|
}
|
|
researchProducts
|
|
.write()
|
|
.mode(SaveMode.Overwrite)
|
|
.option("compression", "gzip")
|
|
.json(outputPath + "ResearchProduct");
|
|
|
|
}
|
|
|
|
private static void selectRelations(SparkSession spark, String inputPath, String workingDir) {
|
|
Dataset<Relation> relation = Utils
|
|
.readPath(
|
|
spark,
|
|
inputPath + "relation", Relation.class)
|
|
.filter(
|
|
(FilterFunction<Relation>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
|
!r.getDataInfo().getInvisible())
|
|
.filter(
|
|
(FilterFunction<Relation>) r -> r
|
|
.getRelClass()
|
|
.equalsIgnoreCase(RelationType.RESULT_AFFILIATIED_TO_ORGANIZATION.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.RESULT_OUTCOME_FUNDING.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.SUPPLEMENT.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.DOCUMENTS.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.PART.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.VERSION.label) ||
|
|
r.getRelClass().equalsIgnoreCase(RelationType.CITATION.label));
|
|
Dataset<EncloseMinElement> encloseMinEntity = Utils.readPath(spark, workingDir + "minEntity", EncloseMinElement.class);
|
|
|
|
relation.joinWith(encloseMinEntity, relation.col("target").equalTo(encloseMinEntity.col("enclosedEntityId")))
|
|
.map((MapFunction<Tuple2<Relation, EncloseMinElement>, EncloseMinElement>) t2 ->
|
|
{
|
|
EncloseMinElement eme = t2._2();
|
|
eme.setResultId(t2._1().getSource());
|
|
eme.setSemantics(t2._1().getRelClass());
|
|
return eme;
|
|
}, Encoders.bean(EncloseMinElement.class))
|
|
.groupByKey((MapFunction<EncloseMinElement, String>) eme -> eme.getResultId(), Encoders.STRING())
|
|
.mapGroups((MapGroupsFunction<String, EncloseMinElement, RelationPerProduct>) (k,v) ->
|
|
{
|
|
RelationPerProduct rpp = new RelationPerProduct();
|
|
rpp.setResultId(k);
|
|
insertEnclosedElement(rpp,v.next());
|
|
v.forEachRemaining(e -> insertEnclosedElement(rpp,e));
|
|
return rpp;
|
|
}, Encoders.bean(RelationPerProduct.class))
|
|
.write()
|
|
.mode(SaveMode.Overwrite)
|
|
.option("compression", "gzip")
|
|
.json(workingDir + "/aggrelation");
|
|
}
|
|
|
|
private static void insertEnclosedElement(RelationPerProduct rpp, EncloseMinElement element) {
|
|
if(Optional.ofNullable(element.getMinOrganization()).isPresent())
|
|
rpp.getOrganizations().add(element.getMinOrganization());
|
|
if(Optional.ofNullable(element.getMinGrant()).isPresent())
|
|
rpp.getFunding().add(element.getMinGrant());
|
|
if(Optional.ofNullable(element.getMinProduct()).isPresent()){
|
|
String sem = element.getSemantics();
|
|
if(!rpp.getRelatedProduct().containsKey(sem))
|
|
rpp.getRelatedProduct().put(sem, new ArrayList<>());
|
|
rpp.getRelatedProduct().get(sem).add(element.getMinProduct());
|
|
}
|
|
|
|
}
|
|
|
|
|
|
private static <R extends Result> Dataset<EncloseMinElement> getMinProduct(SparkSession spark, String inputPath, Class<R> clazz) {
|
|
return Utils.readPath(spark, inputPath , clazz)
|
|
.filter((FilterFunction<R>) r -> !r.getDataInfo().getDeletedbyinference() &&
|
|
!r.getDataInfo().getInvisible())
|
|
.map((MapFunction<R, EncloseMinElement>) r -> {
|
|
EncloseMinElement eme = new EncloseMinElement();
|
|
eme.setEnclosedEntityId(r.getId());
|
|
eme.setMinProduct(Utils.getMinProduct(r));
|
|
return eme;
|
|
}, Encoders.bean(EncloseMinElement.class));
|
|
}
|
|
|
|
}
|