From bef79d3bdf919d6958e32cfd4c4d3af6935d97b4 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 24 Aug 2020 16:49:38 +0200 Subject: [PATCH] first attempt to the dump of pids graph --- .../oa/graph/dump/pid/ResultOrganization.java | 28 +++ .../dhp/oa/graph/dump/pid/ResultPidsList.java | 43 +++++ .../dhp/oa/graph/dump/pid/ResultProject.java | 35 ++++ .../dump/pid/SparkCollectPreparedInfo.java | 80 ++++++++ .../graph/dump/pid/SparkDumpOrganization.java | 87 +++++++++ .../oa/graph/dump/pid/SparkDumpPidAuthor.java | 142 ++++++++++++++ .../oa/graph/dump/pid/SparkDumpPidResult.java | 82 ++++++++ .../oa/graph/dump/pid/SparkDumpProject.java | 90 +++++++++ .../SparkDumpResultOrganizationRelation.java | 132 +++++++++++++ .../pid/SparkDumpResultProjectRelation.java | 129 +++++++++++++ .../dump/pid/SparkDumpResultRelation.java | 175 ++++++++++++++++++ .../dump/pid/SparkPrepareResultPids.java | 127 +++++++++++++ 12 files changed, 1150 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java new file mode 100644 index 0000000000..7a3129d518 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java @@ -0,0 +1,28 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.Serializable; +import java.util.List; + +import eu.dnetlib.dhp.schema.oaf.StructuredProperty; + +public class ResultOrganization implements Serializable { + private String resultId; + private List orgPids; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public List getOrgPid() { + return orgPids; + } + + public void setOrgPid(List pid) { + this.orgPids = pid; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java new file mode 100644 index 0000000000..d66c839b7f --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java @@ -0,0 +1,43 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.Serializable; +import java.util.List; + +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; + +/** + * Needed to create relations between pids in the result. The list of resultAllowedPids will produce relation of type + * source hasOtherMaterialization target (and vice-versa) where source will be identified by one of the pids in the list + * and target by another. A couple of relation between every two nodes. The list of authorAllowedPids will produce + * relation of type source hasAuthor target and target isAuthorOf source for every couple of nodes in result and author. + */ +public class ResultPidsList implements Serializable { + private String resultId; + private List resultAllowedPids; + private List> authorAllowedPids; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public List getResultAllowedPids() { + return resultAllowedPids; + } + + public void setResultAllowedPids(List resultAllowedPids) { + this.resultAllowedPids = resultAllowedPids; + } + + public List> getAuthorAllowedPids() { + return authorAllowedPids; + } + + public void setAuthorAllowedPids(List> authorAllowedPids) { + this.authorAllowedPids = authorAllowedPids; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java new file mode 100644 index 0000000000..aff9143aee --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java @@ -0,0 +1,35 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import java.io.Serializable; +import java.util.List; + +public class ResultProject implements Serializable { + private String resultId; + private String code; + private List fundings; + + public String getResultId() { + return resultId; + } + + public void setResultId(String resultId) { + this.resultId = resultId; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public List getFundings() { + return fundings; + } + + public void setFundings(List fundings) { + this.fundings = fundings; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java new file mode 100644 index 0000000000..079e65ebd3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import javax.rmi.CORBA.Util; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkCollectPreparedInfo implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkCollectPreparedInfo.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkCollectPreparedInfo.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_collectandsave.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("preparedInfoPath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + collectAndSave(spark, inputPath, outputPath); + + }); + + } + + private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) { + + Utils + .readPath(spark, inputPath + "/publication", ResultPidsList.class) + .union(Utils.readPath(spark, inputPath + "/dataset", ResultPidsList.class)) + .union(Utils.readPath(spark, inputPath + "/software", ResultPidsList.class)) + .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", ResultPidsList.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java new file mode 100644 index 0000000000..c686e2aaea --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java @@ -0,0 +1,87 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; +import eu.dnetlib.dhp.schema.oaf.Organization; + +public class SparkDumpOrganization implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpOrganization.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organization.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final List allowedOrgPid = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpPidOrgs(spark, allowedOrgPid, inputPath, outputPath); + + }); + + } + + private static void dumpPidOrgs(SparkSession spark, List allowedOrgPid, String inputPath, + String outputPath) { + Dataset resultPids = Utils.readPath(spark, inputPath, Organization.class); + + resultPids.flatMap((FlatMapFunction) r -> { + List ret = new ArrayList<>(); + r.getPid().forEach(pid -> { + if (allowedOrgPid.contains(pid.getQualifier().getClassid().toLowerCase())) { + ret.add(Entity.newInstance(pid.getQualifier().getClassid() + ":" + pid.getValue())); + } + }); + return ret.iterator(); + }, Encoders.bean(Entity.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/organization"); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java new file mode 100644 index 0000000000..701bb77d5c --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java @@ -0,0 +1,142 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; +import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; +import eu.dnetlib.dhp.schema.oaf.Publication; +import eu.dnetlib.dhp.schema.oaf.Software; + +public class SparkDumpPidAuthor implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpPidAuthor.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpPidAuthor.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_author.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final List allowedAuthorPids = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpPidAuthor(spark, inputPath, outputPath, allowedAuthorPids); + + }); + + } + + private static void dumpPidAuthor(SparkSession spark, String inputPath, String outputPath, List aap) { + Dataset publication = Utils.readPath(spark, inputPath + "/publication", Publication.class); + Dataset dataset = Utils + .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); + Dataset software = Utils.readPath(spark, inputPath + "/software", Software.class); + Dataset other = Utils + .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class); + + publication.createOrReplaceTempView("publication"); + dataset.createOrReplaceTempView("dataset"); + software.createOrReplaceTempView("software"); + other.createOrReplaceTempView("other"); + + Dataset pids = spark + .sql( + "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + + "FROM publication " + + "LATERAL VIEW EXPLODE (author) a as auth " + + "LATERAL VIEW EXPLODE (auth.pid) p as apid ") + .as(Encoders.bean(KeyValue.class)) + .union( + spark + .sql( + "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + + "FROM dataset " + + "LATERAL VIEW EXPLODE (author) a as auth " + + "LATERAL VIEW EXPLODE (auth.pid) p as apid ") + .as(Encoders.bean(KeyValue.class))) + .union( + spark + .sql( + "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + + "FROM software " + + "LATERAL VIEW EXPLODE (author) a as auth " + + "LATERAL VIEW EXPLODE (auth.pid) p as apid ") + .as(Encoders.bean(KeyValue.class))) + .union( + spark + .sql( + "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + + "FROM other " + + "LATERAL VIEW EXPLODE (author) a as auth " + + "LATERAL VIEW EXPLODE (auth.pid) p as apid ") + .as(Encoders.bean(KeyValue.class))); + + pids.createOrReplaceTempView("pids"); + + spark + .sql( + "Select distinct key, value " + + "FROM pids") + .as(Encoders.bean(KeyValue.class)) + .filter((FilterFunction) p -> aap.contains(p.getKey())) + .map( + (MapFunction) pid -> Entity.newInstance(pid.getKey() + ":" + pid.getValue()), + Encoders.bean(Entity.class)) + .write() + +// resultPids.flatMap((FlatMapFunction) r-> { +// List ret = new ArrayList<>(); +// r.getAuthorAllowedPids().forEach(pid -> { +// ret.addAll(pid.stream().map(p -> Entity.newInstance(p.getKey() + ":" + p.getValue())).collect(Collectors.toList())); +// +// }); +// return ret.iterator(); +// }, Encoders.bean(Entity.class)) +// .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/author"); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java new file mode 100644 index 0000000000..8195a06958 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java @@ -0,0 +1,82 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkDumpPidResult implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpPidResult.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpPidResult.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("preparedInfoPath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpPidEntities(spark, inputPath, outputPath); + + }); + + } + + private static void dumpPidEntities(SparkSession spark, String inputPath, String outputPath) { + Dataset resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class); + + resultPids.flatMap((FlatMapFunction) r -> { + List ret = new ArrayList<>(); + r.getResultAllowedPids().forEach(pid -> { + if (StringUtils.isNoneEmpty(pid.getKey(), pid.getValue())) + ret.add(Entity.newInstance(pid.getKey() + ":" + pid.getValue())); + }); + return ret.iterator(); + }, Encoders.bean(Entity.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/result"); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java new file mode 100644 index 0000000000..f0aac87a61 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Project; + +public class SparkDumpProject implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpOrganization.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_project.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpProjects(spark, inputPath, outputPath); + + }); + } + + private static void dumpProjects(SparkSession spark, String inputPath, String outputPath) { + Dataset projectDataset = Utils.readPath(spark, inputPath, Project.class); + + projectDataset.flatMap((FlatMapFunction) project -> { + List projs = new ArrayList<>(); + project.getFundingtree().forEach(fund -> { + try { + projs.add(Utils.getEntity(fund.getValue(), project.getCode().getValue())); + } catch (DocumentException e) { + e.printStackTrace(); + } + }); + return projs.iterator(); + }, Encoders.bean(Entity.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/project"); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java new file mode 100644 index 0000000000..6f802f875d --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java @@ -0,0 +1,132 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Organization; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class SparkDumpResultOrganizationRelation implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpResultOrganizationRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpResultOrganizationRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organizationrels.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultPidListPath = parser.get("preparedInfoPath"); + + final List allowedPids = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpResultOrganziationRelations(spark, inputPath, resultPidListPath, allowedPids, outputPath); + + }); + + } + + private static void dumpResultOrganziationRelations(SparkSession spark, String inputPath, String preparedInfoPath, + List allowedPids, String outputPath) { + Dataset relations = Utils.readPath(spark, inputPath + "/relation", Relation.class); + Dataset organizations = Utils.readPath(spark, inputPath + "/organization", Organization.class); + Dataset resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class); + + relations.createOrReplaceTempView("relation"); + organizations.createOrReplaceTempView("organization"); + + Dataset resultOrg = spark + .sql( + "SELECT source resultId , pid orgPids" + + "FROM relation r " + + "JOIN organization o " + + "ON r.target = o.id " + + "WHERE r.datainfo.deletedbyinference = false " + + "AND o.datainfo.deletedbyinference = false " + + "AND lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + "'") + .as(Encoders.bean(ResultOrganization.class)); + + resultOrg + .joinWith(resultPid, resultOrg.col("resultId").equalTo(resultPid.col("resultId")), "left") + .flatMap( + (FlatMapFunction, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> { + List relList = new ArrayList<>(); + Optional orel = Optional.ofNullable(value._2()); + if (orel.isPresent()) { + List orgList = value + ._1() + .getOrgPid() + .stream() + .filter(p -> allowedPids.contains(p.getQualifier().getClassid())) + .map(pid -> pid.getQualifier().getClassid() + ":" + pid.getValue()) + .collect(Collectors.toList()); + if (orgList.size() > 0) { + List resList = orel.get().getResultAllowedPids(); + for (int i = 0; i < resList.size(); i++) { + String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue(); + for (int j = 0; j < orgList.size(); j++) { + relList + .addAll( + Utils + .getRelationPair( + pid, orgList.get(j), Constants.RESULT, Constants.ORGANIZATION, + ModelConstants.AFFILIATION, ModelConstants.HAS_AUTHOR_INSTITUTION, + ModelConstants.IS_AUTHOR_INSTITUTION_OF)); + + } + } + } + } + + return relList.iterator(); + }, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/relationOrganization"); + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java new file mode 100644 index 0000000000..73fc88a5c2 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java @@ -0,0 +1,129 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.dom4j.DocumentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; + +public class SparkDumpResultProjectRelation implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpResultProjectRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpResultProjectRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_projectrels.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultPidListPath = parser.get("preparedInfoPath"); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpResultProjectRelations(spark, inputPath, resultPidListPath, outputPath); + + }); + + } + + private static void dumpResultProjectRelations(SparkSession spark, String inputPath, String preparedInfoPath, + String outputPath) { + Dataset relations = Utils.readPath(spark, inputPath + "/relation", Relation.class); + Dataset projects = Utils.readPath(spark, inputPath + "/project", Project.class); + Dataset resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class); + + relations.createOrReplaceTempView("relation"); + projects.createOrReplaceTempView("project"); + + Dataset resultProj = spark + .sql( + "SELECT source resultId , code, fundingtree.value fundings" + + "FROM relation r " + + "JOIN project p " + + "ON r.target = p.id " + + "WHERE r.datainfo.deletedbyinference = false " + + "AND lower(relclass) = '" + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'") + .as(Encoders.bean(ResultProject.class)); + + resultProj + .joinWith(resultPid, resultProj.col("resultId").equalTo(resultPid.col("resultId")), "left") + .flatMap( + (FlatMapFunction, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> { + List relList = new ArrayList<>(); + Optional orel = Optional.ofNullable(value._2()); + if (orel.isPresent()) { + List projList = new ArrayList<>(); + String code = value._1().getCode(); + for (String fund : value._1().getFundings()) { + projList.add(Utils.getEntity(fund, code).getId()); + } + + List resList = orel.get().getResultAllowedPids(); + for (int i = 0; i < resList.size(); i++) { + String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue(); + for (int j = 0; j < projList.size(); j++) { + relList + .addAll( + Utils + .getRelationPair( + pid, projList.get(j), Constants.RESULT, Constants.PROJECT, + ModelConstants.OUTCOME, ModelConstants.IS_PRODUCED_BY, + ModelConstants.PRODUCES)); + + } + } + + } + + return relList.iterator(); + }, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/relationProject"); + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java new file mode 100644 index 0000000000..6358df48ba --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java @@ -0,0 +1,175 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.*; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; +import eu.dnetlib.dhp.schema.oaf.Project; + +public class SparkDumpResultRelation implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpResultRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpResultRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("preparedInfoPath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpPidRelations(spark, inputPath, outputPath); + + }); + + } + + private static Dataset distinctRelations(Dataset rels) { + return rels + .filter(getRelationFilterFunction()) + .groupByKey( + (MapFunction) r -> String + .join( + r.getSource().getId(), r.getTarget().getId(), r.getReltype().getName(), + r.getReltype().getType()), + Encoders.STRING()) + .mapGroups( + (MapGroupsFunction) (key, relationIterator) -> relationIterator.next(), + Encoders.bean(Relation.class)); + } + + private static FilterFunction getRelationFilterFunction() { + return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource().getId()) || + StringUtils.isNotBlank(r.getTarget().getId()) || + StringUtils.isNotBlank(r.getReltype().getName()) || + StringUtils.isNotBlank(r.getReltype().getType()); + } + + private static void dumpPidRelations(SparkSession spark, String inputPath, String outputPath) { + Dataset resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class); + + distinctRelations(resultPids.flatMap((FlatMapFunction) r -> { + List ret = new ArrayList<>(); + List resPids = r.getResultAllowedPids(); + List> authPids = r.getAuthorAllowedPids(); + + for (int i = 0; i < resPids.size() - 1; i++) { + String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue(); + for (int j = i + 1; j < resPids.size(); j++) { + ret + .addAll( + Utils + .getRelationPair( + pid, resPids.get(j).getKey() + ":" + resPids.get(j).getValue(), + Constants.RESULT, Constants.RESULT, Constants.SIMILARITY, + Constants.RESPID_RESPID_RELATION, Constants.RESPID_RESPID_RELATION)); + } + } + + for (int i = 0; i < authPids.size() - 1; i++) { + for (int j = i + 1; j < authPids.size(); j++) { + ret.addAll(getAuthRelations(authPids.get(i), authPids.get(j))); + } + } + + for (int i = 0; i < resPids.size(); i++) { + String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue(); + for (int j = 0; j < authPids.size(); j++) { + for (int k = 0; k < authPids.get(j).size(); k++) { + ret + .addAll( + Utils + .getRelationPair( + pid, + authPids.get(j).get(k).getKey() + ":" + authPids.get(j).get(k).getValue(), + Constants.RESULT, Constants.AUTHOR, Constants.AUTHORSHIP, + Constants.RES_AUTHOR_RELATION, Constants.AUTHOR_RES_RELATION)); + } + + } + } + return ret.iterator(); + }, Encoders.bean(Relation.class))) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/relation"); + } + + private static List getAuthRelations(List a1, List a2) { + List ret = new ArrayList<>(); + if (a1.size() > 1) { + ret.addAll(getSameAs(a1)); + } + if (a2.size() > 1) { + ret.addAll(getSameAs(a2)); + } + for (int i = 0; i < a1.size(); i++) { + String pid = a1.get(i).getKey() + ":" + a1.get(i).getValue(); + for (int j = 0; j < a2.size(); j++) { + ret + .addAll( + Utils + .getRelationPair( + pid, a2.get(j).getKey() + ":" + a2.get(j).getValue(), + Constants.AUTHOR, Constants.AUTHOR, Constants.AUTHORSHIP, + Constants.AUTHOR_AUTHOR_RELATION, Constants.AUTHOR_AUTHOR_RELATION)); + } + } + + return ret; + } + + private static List getSameAs(List a1) { + List ret = new ArrayList<>(); + for (int i = 0; i < a1.size() - 1; i++) { + ret + .addAll( + Utils + .getRelationPair( + a1.get(i).getKey() + ":" + a1.get(i).getValue(), + a1.get(i + 1).getKey() + ":" + a1.get(i + 1).getValue(), + Constants.AUTHOR, Constants.AUTHOR, Constants.SIMILARITY, + Constants.SAME_AS, Constants.SAME_AS)); + } + return ret; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java new file mode 100644 index 0000000000..68adf82573 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java @@ -0,0 +1,127 @@ + +package eu.dnetlib.dhp.oa.graph.dump.pid; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Result; + +public class SparkPrepareResultPids implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultPids.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkPrepareResultPids.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_pid/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final List allowedResultPid = new Gson().fromJson(parser.get("allowedResultPids"), List.class); + final List allowedAuthorPid = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class); + + final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); + log.info("resultType: {}", resultType); + + Class inputClazz = (Class) Class.forName(resultClassName); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + preparePidEntities( + spark, inputPath, outputPath + "/" + resultType, inputClazz, allowedResultPid, allowedAuthorPid); + + }); + + } + + private static void preparePidEntities(SparkSession spark, String inputPath, String outputPath, + Class inputClazz, List allowedResultPid, + List allowedAuthorPid) { + + Dataset result = Utils.readPath(spark, inputPath, inputClazz); + + result.map((MapFunction) res -> { + ResultPidsList ret = new ResultPidsList(); + ret.setResultId(res.getId()); + List pidList = new ArrayList<>(); + Optional + .ofNullable(res.getPid()) + .ifPresent(pids -> pids.forEach(pid -> { + if (allowedResultPid.contains(pid.getQualifier().getClassid().toLowerCase())) { + pidList.add(KeyValue.newInstance(pid.getQualifier().getClassid(), pid.getValue())); + } + })); + ret.setResultAllowedPids(pidList); + List> authorPidList = new ArrayList<>(); + Optional + .ofNullable(res.getAuthor()) + .ifPresent(authors -> authors.forEach(a -> { + Optional + .ofNullable(a.getPid()) + .ifPresent(pids -> pids.forEach(p -> { + List authorPids = new ArrayList<>(); + if (allowedAuthorPid.contains(p.getQualifier().getClassid().toLowerCase())) { + authorPids.add(KeyValue.newInstance(p.getQualifier().getClassid(), p.getValue())); + } + if (authorPids.size() > 0) { + authorPidList.add(authorPids); + } + })); + })); + ret.setAuthorAllowedPids(authorPidList); + + if (authorPidList.size() == 0 && pidList.size() == 0) { + return null; + } + return ret; + }, Encoders.bean(ResultPidsList.class)) + .filter(Objects::nonNull) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath); + } + +}