package eu.dnetlib.dhp.oa.graph.dump.skgif; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.skgif.model.*; import eu.dnetlib.dhp.utils.DHPUtils; import scala.Tuple2; /** * @author miriam.baglioni * @Date 06/02/24 */ public class EmitFromResults implements Serializable { private static final Logger log = LoggerFactory.getLogger(EmitFromResults.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( EmitFromResults.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); final String workingDir = parser.get("workingDir"); log.info("workingDir: {}", workingDir); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); emitFromResult(spark, inputPath, outputPath, workingDir); }); } //per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza public static void emitFromResult(SparkSession spark, String inputPath, String outputPath, String workingDir) { emitManifestation(spark, inputPath, workingDir); emitPerson(spark, inputPath, outputPath, workingDir); emitTopic(spark, inputPath, outputPath, workingDir); } private static void emitTopic(SparkSession spark, String inputPath, String outputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .filter((FilterFunction) r -> Optional.ofNullable(r.getSubject()).isPresent()) .flatMap( (FlatMapFunction) r -> r .getSubject() .stream() .filter( s -> s.getQualifier().getClassid().equalsIgnoreCase("fos") || s.getQualifier().getClassid().equalsIgnoreCase("sdg")) .map(s -> { Topic t = new Topic(); t .setLocal_identifier( Utils .getIdentifier( Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue())); t .setIdentifiers( Arrays .asList( Identifier.newInstance(s.getQualifier().getClassid(), s.getValue()))); t.setName(s.getValue()); return t; }) .collect(Collectors.toList()) .iterator(), Encoders.bean(Topic.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/topic"); } }); Dataset topics = spark.emptyDataset(Encoders.bean(Topic.class)); for (EntityType entityType : ModelSupport.entityTypes.keySet()) { if (ModelSupport.isResult(entityType)) topics = topics.union(Utils.readPath(spark, workingDir + entityType.name() + "/topic", Topic.class)); } topics .groupByKey((MapFunction) p -> p.getLocal_identifier(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Topic.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/Topic"); } private static void emitPerson(SparkSession spark, String inputPath, String outputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .flatMap((FlatMapFunction) r -> { List authors = new ArrayList<>(); if (Optional.ofNullable(r.getAuthor()).isPresent() && r.getAuthor().size() > 0) { int count = 0; for (Author a : r.getAuthor()) { count += 1; Persons p = new Persons(); p.setFamily_name(a.getSurname()); p.setGiven_name(a.getName()); String identifier = new String(); if (Optional.ofNullable(a.getPid()).isPresent()) { Tuple2 orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils .getOrcid(a.getPid()); if (orcid != null) { identifier = Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2()); if (orcid._2()) p .setIdentifiers( Arrays.asList(Identifier.newInstance("orcid", orcid._1()))); else p .setIdentifiers( Arrays .asList(Identifier.newInstance("inferred_orcid", orcid._1()))); } else { if (Optional.ofNullable(a.getRank()).isPresent()) { identifier = Utils .getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + a.getRank()); } else { identifier = Utils .getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + count); } } } p.setLocal_identifier(identifier); authors.add(p); } } return authors.iterator(); }, Encoders.bean(Persons.class)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/person"); } }); Dataset persons = spark.emptyDataset(Encoders.bean(Persons.class)); for (EntityType entityType : ModelSupport.entityTypes.keySet()) { if (ModelSupport.isResult(entityType)) persons = persons .union(Utils.readPath(spark, workingDir + entityType.name() + "/person", Persons.class)); } persons .groupByKey((MapFunction) p -> p.getLocal_identifier(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Persons.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/Persons"); } private static void emitManifestation(SparkSession spark, String inputPath, String workingDir) { Dataset datasource = Utils .readPath(spark, inputPath + "datasource", Datasource.class) .filter( (FilterFunction) d -> Optional.ofNullable(d.getEosctype()).isPresent() && d.getEosctype().getClassname().equalsIgnoreCase("Journal archive")); ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); // Dataset emitformanifestation = Utils .readPath(spark, inputPath + e.name(), resultClazz) .flatMap((FlatMapFunction) p -> p.getInstance().stream().map(i -> { EmitPerManifestation epb = new EmitPerManifestation(); epb.setResultId(p.getId()); epb.setInstance(i); epb.setHostedBy(i.getHostedby().getKey()); epb .setPublisher( Optional .ofNullable(p.getPublisher()) .map(v -> v.getValue()) .orElse(new String())); if (p.getClass() == Publication.class) { epb.setJournal(((Publication) p).getJournal()); } return epb; }).collect(Collectors.toList()).iterator(), Encoders.bean(EmitPerManifestation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/manifestation"); ; } }); Dataset emitPerManifestationDataset = Utils .readPath( spark, workingDir + "software/manifestation", EmitPerManifestation.class) .union( Utils .readPath( spark, workingDir + "dataset/manifestation", EmitPerManifestation.class)) .union( Utils .readPath( spark, workingDir + "publication/manifestation", EmitPerManifestation.class)) .union( Utils .readPath( spark, workingDir + "otherresearchproduct/manifestation", EmitPerManifestation.class)); emitPerManifestationDataset .groupByKey((MapFunction) p -> p.getHostedBy(), Encoders.STRING()) .mapGroups( (MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(EmitPerManifestation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + "/datasourcePublisher"); } }