package eu.dnetlib.dhp.oa.graph.dump.skgif; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.*; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EmitPerManifestation; import eu.dnetlib.dhp.oa.graph.dump.skgif.beans.EncloseMinElement; import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.Datasource; import eu.dnetlib.dhp.schema.oaf.Organization; import eu.dnetlib.dhp.skgif.model.*; import scala.Tuple2; /** * @author miriam.baglioni * @Date 06/02/24 */ public class EmitFromEntities implements Serializable { private static final Logger log = LoggerFactory.getLogger(EmitFromEntities.class); public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( EmitFromEntities.class .getResourceAsStream( "/eu/dnetlib/dhp/oa/graph/dump/emit_biblio_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); Boolean isSparkSessionManaged = Optional .ofNullable(parser.get("isSparkSessionManaged")) .map(Boolean::valueOf) .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("sourcePath"); log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); final String workingDir = parser.get("workingDir"); log.info("workingDir: {}", workingDir); SparkConf conf = new SparkConf(); runWithSparkSession( conf, isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); emitFromResult(spark, inputPath, outputPath, workingDir); emitFromDatasource(spark, inputPath, workingDir); emitFromOrganization(spark, inputPath, workingDir); emitFromProject(spark, inputPath, workingDir); }); } private static void emitFromProject(SparkSession spark, String inputPath, String workingDir) { Utils .readPath(spark, inputPath + "project", Project.class) .filter((FilterFunction) p -> !p.getDataInfo().getDeletedbyinference()) .map((MapFunction) p -> { EncloseMinElement eme = new EncloseMinElement(); eme.setEnclosedEntityId(p.getId()); eme.setMinGrant(Utils.getMinGrant(p)); return eme; }, Encoders.bean(EncloseMinElement.class)) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "/minEntity"); } private static void emitFromOrganization(SparkSession spark, String inputPath, String workingDir) { Utils .readPath(spark, inputPath + "organization", Organization.class) .filter((FilterFunction) o -> !o.getDataInfo().getDeletedbyinference()) .map((MapFunction) o -> { EncloseMinElement eme = new EncloseMinElement(); eme.setMinOrganization(Utils.getMinOrganization(o)); eme.setEnclosedEntityId(o.getId()); return eme; }, Encoders.bean(EncloseMinElement.class)) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "/minEntity"); } private static void emitFromDatasource(SparkSession spark, String inputPath, String workingDir) { Utils .readPath(spark, inputPath + "datasource", Datasource.class) .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) .map((MapFunction) d -> { EncloseMinElement eme = new EncloseMinElement(); eme .setMinDatsource( MinVenue .newInstance( Utils.getIdentifier(Prefixes.DATASOURCE, d.getId()), d.getOfficialname().getValue())); eme.setEnclosedEntityId(d.getId()); return eme; }, Encoders.bean(EncloseMinElement.class)) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "/minEntity"); Utils .readPath(spark, inputPath + "datasource", Datasource.class) .filter((FilterFunction) d -> !d.getDataInfo().getDeletedbyinference()) .filter( (FilterFunction) d -> Optional.ofNullable(d.getEoscdatasourcetype()).isPresent() && d .getEoscdatasourcetype() .getClassid() .equalsIgnoreCase("Journal archive")) .map((MapFunction) d -> { EncloseMinElement eme = new EncloseMinElement(); eme.setEnclosedEntityId(d.getId()); if (Optional.ofNullable(d.getJournal()).isPresent() && Optional.ofNullable(d.getJournal().getIssnPrinted()).isPresent()) { eme .setMinVenue( MinVenue .newInstance( Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnPrinted()), d.getOfficialname().getValue())); return eme; } if (Optional.ofNullable(d.getJournal()).isPresent() && Optional.ofNullable(d.getJournal().getIssnOnline()).isPresent()) { eme .setMinVenue( MinVenue .newInstance( Utils.getIdentifier(Prefixes.VENUE, d.getJournal().getIssnOnline()), d.getOfficialname().getValue())); return eme; } return null; }, Encoders.bean(EncloseMinElement.class)) .filter(Objects::nonNull) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "/minEntity"); } // per ogni result emetto id + journal se esiste + istanza + hosted by dell'istanza public static void emitFromResult(SparkSession spark, String inputPath, String outputPath, String workingDir) { emitManifestation(spark, inputPath, workingDir); emitPerson(spark, inputPath, outputPath, workingDir); emitTopic(spark, inputPath, outputPath, workingDir); emitMinProduct(spark, inputPath, workingDir); } private static void emitMinProduct(SparkSession spark, String inputPath, String workingDir) { Utils.removeOutputDir(spark, workingDir + "minEntity"); ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .map((MapFunction) p -> { EncloseMinElement eme = new EncloseMinElement(); eme.setMinProduct(Utils.getMinProduct(p)); eme.setEnclosedEntityId(p.getId()); return eme; }, Encoders.bean(EncloseMinElement.class)) .write() .mode(SaveMode.Append) .option("compression", "gzip") .json(workingDir + "/minEntity"); } }); } private static void emitTopic(SparkSession spark, String inputPath, String outputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .filter((FilterFunction) r -> Optional.ofNullable(r.getSubject()).isPresent()) .flatMap( (FlatMapFunction) r -> r .getSubject() .stream() .filter( s -> s.getQualifier().getClassid().equalsIgnoreCase("fos") || s.getQualifier().getClassid().equalsIgnoreCase("sdg")) .map(s -> { Topic t = new Topic(); t .setLocal_identifier( Utils .getIdentifier( Prefixes.TOPIC, s.getQualifier().getClassid() + s.getValue())); t .setIdentifiers( Arrays .asList( Identifier.newInstance(s.getQualifier().getClassid(), s.getValue()))); t.setName(s.getValue()); return t; }) .collect(Collectors.toList()) .iterator(), Encoders.bean(Topic.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/topic"); } }); Dataset topics = spark.emptyDataset(Encoders.bean(Topic.class)); for (EntityType entityType : ModelSupport.entityTypes.keySet()) { if (ModelSupport.isResult(entityType)) topics = topics.union(Utils.readPath(spark, workingDir + entityType.name() + "/topic", Topic.class)); } topics .groupByKey((MapFunction) p -> p.getLocal_identifier(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Topic.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/Topic"); } private static void emitPerson(SparkSession spark, String inputPath, String outputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .flatMap((FlatMapFunction) r -> { List authors = new ArrayList<>(); if (Optional.ofNullable(r.getAuthor()).isPresent() && r.getAuthor().size() > 0) { int count = 0; for (Author a : r.getAuthor()) { count += 1; Persons p = new Persons(); p.setFamily_name(a.getSurname()); p.setGiven_name(a.getName()); String identifier = new String(); if (Optional.ofNullable(a.getPid()).isPresent()) { Tuple2 orcid = eu.dnetlib.dhp.oa.graph.dump.skgif.Utils .getOrcid(a.getPid()); if (orcid != null) { identifier = Utils.getIdentifier(Prefixes.PERSON, orcid._1() + orcid._2()); if (orcid._2()) p .setIdentifiers( Arrays.asList(Identifier.newInstance("orcid", orcid._1()))); else p .setIdentifiers( Arrays .asList(Identifier.newInstance("inferred_orcid", orcid._1()))); } else { if (Optional.ofNullable(a.getRank()).isPresent()) { identifier = Utils .getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + a.getRank()); } else { identifier = Utils .getIdentifier(Prefixes.TEMPORARY_PERSON, r.getId() + count); } } } p.setLocal_identifier(identifier); authors.add(p); } } return authors.iterator(); }, Encoders.bean(Persons.class)) .filter((FilterFunction) p -> p != null) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/person"); } }); Dataset persons = spark.emptyDataset(Encoders.bean(Persons.class)); for (EntityType entityType : ModelSupport.entityTypes.keySet()) { if (ModelSupport.isResult(entityType)) persons = persons .union(Utils.readPath(spark, workingDir + entityType.name() + "/person", Persons.class)); } persons .groupByKey((MapFunction) p -> p.getLocal_identifier(), Encoders.STRING()) .mapGroups((MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(Persons.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath + "/Persons"); } private static void emitManifestation(SparkSession spark, String inputPath, String workingDir) { ModelSupport.entityTypes.keySet().forEach(e -> { if (ModelSupport.isResult(e)) { Class resultClazz = ModelSupport.entityTypes.get(e); Utils .readPath(spark, inputPath + e.name(), resultClazz) .flatMap((FlatMapFunction) p -> p.getInstance().stream().map(i -> { EmitPerManifestation epb = new EmitPerManifestation(); epb.setResultId(p.getId()); epb.setInstance(i); epb.setHostedBy(i.getHostedby().getKey()); epb .setPublisher( Optional .ofNullable(p.getPublisher()) .map(v -> v.getValue()) .orElse(new String())); if (p.getClass() == Publication.class) { epb.setJournal(((Publication) p).getJournal()); } return epb; }).collect(Collectors.toList()).iterator(), Encoders.bean(EmitPerManifestation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + e.name() + "/manifestation"); } }); Dataset emitPerManifestationDataset = Utils .readPath( spark, workingDir + "software/manifestation", EmitPerManifestation.class) .union( Utils .readPath( spark, workingDir + "dataset/manifestation", EmitPerManifestation.class)) .union( Utils .readPath( spark, workingDir + "publication/manifestation", EmitPerManifestation.class)) .union( Utils .readPath( spark, workingDir + "otherresearchproduct/manifestation", EmitPerManifestation.class)); emitPerManifestationDataset .groupByKey((MapFunction) p -> p.getHostedBy(), Encoders.STRING()) .mapGroups( (MapGroupsFunction) (k, v) -> v.next(), Encoders.bean(EmitPerManifestation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(workingDir + "/datasourcePublisher"); } }