From 16c54a96f819999a4d73d3faaa024778beb826fb Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 4 Nov 2020 17:11:32 +0100 Subject: [PATCH] removed pid dump --- .../oa/graph/dump/pid/ResultOrganization.java | 28 --- .../dhp/oa/graph/dump/pid/ResultPidsList.java | 43 ----- .../dhp/oa/graph/dump/pid/ResultProject.java | 35 ---- .../dump/pid/SparkCollectPreparedInfo.java | 80 -------- .../graph/dump/pid/SparkDumpOrganization.java | 87 --------- .../oa/graph/dump/pid/SparkDumpPidAuthor.java | 142 -------------- .../oa/graph/dump/pid/SparkDumpPidResult.java | 82 -------- .../oa/graph/dump/pid/SparkDumpProject.java | 90 --------- .../SparkDumpResultOrganizationRelation.java | 132 ------------- .../pid/SparkDumpResultProjectRelation.java | 129 ------------- .../dump/pid/SparkDumpResultRelation.java | 175 ------------------ .../dump/pid/SparkPrepareResultPids.java | 127 ------------- 12 files changed, 1150 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java deleted file mode 100644 index 7a3129d51..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultOrganization.java +++ /dev/null @@ -1,28 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import java.io.Serializable; -import java.util.List; - -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; - -public class ResultOrganization implements Serializable { - private String resultId; - private List orgPids; - - public String getResultId() { - return resultId; - } - - public void setResultId(String resultId) { - this.resultId = resultId; - } - - public List getOrgPid() { - return orgPids; - } - - public void setOrgPid(List pid) { - this.orgPids = pid; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java deleted file mode 100644 index d66c839b7..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultPidsList.java +++ /dev/null @@ -1,43 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import java.io.Serializable; -import java.util.List; - -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; - -/** - * Needed to create relations between pids in the result. The list of resultAllowedPids will produce relation of type - * source hasOtherMaterialization target (and vice-versa) where source will be identified by one of the pids in the list - * and target by another. A couple of relation between every two nodes. The list of authorAllowedPids will produce - * relation of type source hasAuthor target and target isAuthorOf source for every couple of nodes in result and author. - */ -public class ResultPidsList implements Serializable { - private String resultId; - private List resultAllowedPids; - private List> authorAllowedPids; - - public String getResultId() { - return resultId; - } - - public void setResultId(String resultId) { - this.resultId = resultId; - } - - public List getResultAllowedPids() { - return resultAllowedPids; - } - - public void setResultAllowedPids(List resultAllowedPids) { - this.resultAllowedPids = resultAllowedPids; - } - - public List> getAuthorAllowedPids() { - return authorAllowedPids; - } - - public void setAuthorAllowedPids(List> authorAllowedPids) { - this.authorAllowedPids = authorAllowedPids; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java deleted file mode 100644 index aff9143ae..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/ResultProject.java +++ /dev/null @@ -1,35 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import java.io.Serializable; -import java.util.List; - -public class ResultProject implements Serializable { - private String resultId; - private String code; - private List fundings; - - public String getResultId() { - return resultId; - } - - public void setResultId(String resultId) { - this.resultId = resultId; - } - - public String getCode() { - return code; - } - - public void setCode(String code) { - this.code = code; - } - - public List getFundings() { - return fundings; - } - - public void setFundings(List fundings) { - this.fundings = fundings; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java deleted file mode 100644 index 079e65ebd..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkCollectPreparedInfo.java +++ /dev/null @@ -1,80 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import javax.rmi.CORBA.Util; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Result; - -public class SparkCollectPreparedInfo implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkCollectPreparedInfo.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkCollectPreparedInfo.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_collectandsave.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("preparedInfoPath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - collectAndSave(spark, inputPath, outputPath); - - }); - - } - - private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) { - - Utils - .readPath(spark, inputPath + "/publication", ResultPidsList.class) - .union(Utils.readPath(spark, inputPath + "/dataset", ResultPidsList.class)) - .union(Utils.readPath(spark, inputPath + "/software", ResultPidsList.class)) - .union(Utils.readPath(spark, inputPath + "/otherresearchproduct", ResultPidsList.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java deleted file mode 100644 index c686e2aae..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpOrganization.java +++ /dev/null @@ -1,87 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; -import eu.dnetlib.dhp.schema.oaf.Organization; - -public class SparkDumpOrganization implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpOrganization.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organization.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final List allowedOrgPid = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpPidOrgs(spark, allowedOrgPid, inputPath, outputPath); - - }); - - } - - private static void dumpPidOrgs(SparkSession spark, List allowedOrgPid, String inputPath, - String outputPath) { - Dataset resultPids = Utils.readPath(spark, inputPath, Organization.class); - - resultPids.flatMap((FlatMapFunction) r -> { - List ret = new ArrayList<>(); - r.getPid().forEach(pid -> { - if (allowedOrgPid.contains(pid.getQualifier().getClassid().toLowerCase())) { - ret.add(Entity.newInstance(pid.getQualifier().getClassid() + ":" + pid.getValue())); - } - }); - return ret.iterator(); - }, Encoders.bean(Entity.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/organization"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java deleted file mode 100644 index 701bb77d5..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidAuthor.java +++ /dev/null @@ -1,142 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; -import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Software; - -public class SparkDumpPidAuthor implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkDumpPidAuthor.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpPidAuthor.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_author.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final List allowedAuthorPids = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpPidAuthor(spark, inputPath, outputPath, allowedAuthorPids); - - }); - - } - - private static void dumpPidAuthor(SparkSession spark, String inputPath, String outputPath, List aap) { - Dataset publication = Utils.readPath(spark, inputPath + "/publication", Publication.class); - Dataset dataset = Utils - .readPath(spark, inputPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset software = Utils.readPath(spark, inputPath + "/software", Software.class); - Dataset other = Utils - .readPath(spark, inputPath + "/otherresearchproduct", OtherResearchProduct.class); - - publication.createOrReplaceTempView("publication"); - dataset.createOrReplaceTempView("dataset"); - software.createOrReplaceTempView("software"); - other.createOrReplaceTempView("other"); - - Dataset pids = spark - .sql( - "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + - "FROM publication " + - "LATERAL VIEW EXPLODE (author) a as auth " + - "LATERAL VIEW EXPLODE (auth.pid) p as apid ") - .as(Encoders.bean(KeyValue.class)) - .union( - spark - .sql( - "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + - "FROM dataset " + - "LATERAL VIEW EXPLODE (author) a as auth " + - "LATERAL VIEW EXPLODE (auth.pid) p as apid ") - .as(Encoders.bean(KeyValue.class))) - .union( - spark - .sql( - "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + - "FROM software " + - "LATERAL VIEW EXPLODE (author) a as auth " + - "LATERAL VIEW EXPLODE (auth.pid) p as apid ") - .as(Encoders.bean(KeyValue.class))) - .union( - spark - .sql( - "SELECT DISTINCT apid.value value , apid.qualifier.classid key " + - "FROM other " + - "LATERAL VIEW EXPLODE (author) a as auth " + - "LATERAL VIEW EXPLODE (auth.pid) p as apid ") - .as(Encoders.bean(KeyValue.class))); - - pids.createOrReplaceTempView("pids"); - - spark - .sql( - "Select distinct key, value " + - "FROM pids") - .as(Encoders.bean(KeyValue.class)) - .filter((FilterFunction) p -> aap.contains(p.getKey())) - .map( - (MapFunction) pid -> Entity.newInstance(pid.getKey() + ":" + pid.getValue()), - Encoders.bean(Entity.class)) - .write() - -// resultPids.flatMap((FlatMapFunction) r-> { -// List ret = new ArrayList<>(); -// r.getAuthorAllowedPids().forEach(pid -> { -// ret.addAll(pid.stream().map(p -> Entity.newInstance(p.getKey() + ":" + p.getValue())).collect(Collectors.toList())); -// -// }); -// return ret.iterator(); -// }, Encoders.bean(Entity.class)) -// .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/author"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java deleted file mode 100644 index 8195a0695..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpPidResult.java +++ /dev/null @@ -1,82 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; -import eu.dnetlib.dhp.schema.oaf.Result; - -public class SparkDumpPidResult implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpPidResult.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpPidResult.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("preparedInfoPath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpPidEntities(spark, inputPath, outputPath); - - }); - - } - - private static void dumpPidEntities(SparkSession spark, String inputPath, String outputPath) { - Dataset resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class); - - resultPids.flatMap((FlatMapFunction) r -> { - List ret = new ArrayList<>(); - r.getResultAllowedPids().forEach(pid -> { - if (StringUtils.isNoneEmpty(pid.getKey(), pid.getValue())) - ret.add(Entity.newInstance(pid.getKey() + ":" + pid.getValue())); - }); - return ret.iterator(); - }, Encoders.bean(Entity.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/result"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java deleted file mode 100644 index f0aac87a6..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpProject.java +++ /dev/null @@ -1,90 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.dom4j.Document; -import org.dom4j.DocumentException; -import org.dom4j.io.SAXReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.pidgraph.Entity; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Project; - -public class SparkDumpProject implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpOrganization.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpOrganization.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_project.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpProjects(spark, inputPath, outputPath); - - }); - } - - private static void dumpProjects(SparkSession spark, String inputPath, String outputPath) { - Dataset projectDataset = Utils.readPath(spark, inputPath, Project.class); - - projectDataset.flatMap((FlatMapFunction) project -> { - List projs = new ArrayList<>(); - project.getFundingtree().forEach(fund -> { - try { - projs.add(Utils.getEntity(fund.getValue(), project.getCode().getValue())); - } catch (DocumentException e) { - e.printStackTrace(); - } - }); - return projs.iterator(); - }, Encoders.bean(Entity.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/project"); - - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java deleted file mode 100644 index 6f802f875..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultOrganizationRelation.java +++ /dev/null @@ -1,132 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Constants; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; - -public class SparkDumpResultOrganizationRelation implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkDumpResultOrganizationRelation.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpResultOrganizationRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_organizationrels.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String resultPidListPath = parser.get("preparedInfoPath"); - - final List allowedPids = new Gson().fromJson(parser.get("allowedOrganizationPids"), List.class); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpResultOrganziationRelations(spark, inputPath, resultPidListPath, allowedPids, outputPath); - - }); - - } - - private static void dumpResultOrganziationRelations(SparkSession spark, String inputPath, String preparedInfoPath, - List allowedPids, String outputPath) { - Dataset relations = Utils.readPath(spark, inputPath + "/relation", Relation.class); - Dataset organizations = Utils.readPath(spark, inputPath + "/organization", Organization.class); - Dataset resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class); - - relations.createOrReplaceTempView("relation"); - organizations.createOrReplaceTempView("organization"); - - Dataset resultOrg = spark - .sql( - "SELECT source resultId , pid orgPids" + - "FROM relation r " + - "JOIN organization o " + - "ON r.target = o.id " + - "WHERE r.datainfo.deletedbyinference = false " + - "AND o.datainfo.deletedbyinference = false " + - "AND lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + "'") - .as(Encoders.bean(ResultOrganization.class)); - - resultOrg - .joinWith(resultPid, resultOrg.col("resultId").equalTo(resultPid.col("resultId")), "left") - .flatMap( - (FlatMapFunction, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> { - List relList = new ArrayList<>(); - Optional orel = Optional.ofNullable(value._2()); - if (orel.isPresent()) { - List orgList = value - ._1() - .getOrgPid() - .stream() - .filter(p -> allowedPids.contains(p.getQualifier().getClassid())) - .map(pid -> pid.getQualifier().getClassid() + ":" + pid.getValue()) - .collect(Collectors.toList()); - if (orgList.size() > 0) { - List resList = orel.get().getResultAllowedPids(); - for (int i = 0; i < resList.size(); i++) { - String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue(); - for (int j = 0; j < orgList.size(); j++) { - relList - .addAll( - Utils - .getRelationPair( - pid, orgList.get(j), Constants.RESULT, Constants.ORGANIZATION, - ModelConstants.AFFILIATION, ModelConstants.HAS_AUTHOR_INSTITUTION, - ModelConstants.IS_AUTHOR_INSTITUTION_OF)); - - } - } - } - } - - return relList.iterator(); - }, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/relationOrganization"); - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java deleted file mode 100644 index 73fc88a5c..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultProjectRelation.java +++ /dev/null @@ -1,129 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.dom4j.DocumentException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Constants; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Project; -import eu.dnetlib.dhp.schema.oaf.Relation; -import scala.Tuple2; - -public class SparkDumpResultProjectRelation implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkDumpResultProjectRelation.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpResultProjectRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_projectrels.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String resultPidListPath = parser.get("preparedInfoPath"); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpResultProjectRelations(spark, inputPath, resultPidListPath, outputPath); - - }); - - } - - private static void dumpResultProjectRelations(SparkSession spark, String inputPath, String preparedInfoPath, - String outputPath) { - Dataset relations = Utils.readPath(spark, inputPath + "/relation", Relation.class); - Dataset projects = Utils.readPath(spark, inputPath + "/project", Project.class); - Dataset resultPid = Utils.readPath(spark, preparedInfoPath, ResultPidsList.class); - - relations.createOrReplaceTempView("relation"); - projects.createOrReplaceTempView("project"); - - Dataset resultProj = spark - .sql( - "SELECT source resultId , code, fundingtree.value fundings" + - "FROM relation r " + - "JOIN project p " + - "ON r.target = p.id " + - "WHERE r.datainfo.deletedbyinference = false " + - "AND lower(relclass) = '" + ModelConstants.IS_PRODUCED_BY.toLowerCase() + "'") - .as(Encoders.bean(ResultProject.class)); - - resultProj - .joinWith(resultPid, resultProj.col("resultId").equalTo(resultPid.col("resultId")), "left") - .flatMap( - (FlatMapFunction, eu.dnetlib.dhp.schema.dump.oaf.graph.Relation>) value -> { - List relList = new ArrayList<>(); - Optional orel = Optional.ofNullable(value._2()); - if (orel.isPresent()) { - List projList = new ArrayList<>(); - String code = value._1().getCode(); - for (String fund : value._1().getFundings()) { - projList.add(Utils.getEntity(fund, code).getId()); - } - - List resList = orel.get().getResultAllowedPids(); - for (int i = 0; i < resList.size(); i++) { - String pid = resList.get(i).getKey() + ":" + resList.get(i).getValue(); - for (int j = 0; j < projList.size(); j++) { - relList - .addAll( - Utils - .getRelationPair( - pid, projList.get(j), Constants.RESULT, Constants.PROJECT, - ModelConstants.OUTCOME, ModelConstants.IS_PRODUCED_BY, - ModelConstants.PRODUCES)); - - } - } - - } - - return relList.iterator(); - }, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/relationProject"); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java deleted file mode 100644 index 6358df48b..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkDumpResultRelation.java +++ /dev/null @@ -1,175 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.*; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Constants; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.dump.community.ResultProject; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; -import eu.dnetlib.dhp.schema.oaf.Project; - -public class SparkDumpResultRelation implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpResultRelation.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpResultRelation.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_dump_result.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("preparedInfoPath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - dumpPidRelations(spark, inputPath, outputPath); - - }); - - } - - private static Dataset distinctRelations(Dataset rels) { - return rels - .filter(getRelationFilterFunction()) - .groupByKey( - (MapFunction) r -> String - .join( - r.getSource().getId(), r.getTarget().getId(), r.getReltype().getName(), - r.getReltype().getType()), - Encoders.STRING()) - .mapGroups( - (MapGroupsFunction) (key, relationIterator) -> relationIterator.next(), - Encoders.bean(Relation.class)); - } - - private static FilterFunction getRelationFilterFunction() { - return (FilterFunction) r -> StringUtils.isNotBlank(r.getSource().getId()) || - StringUtils.isNotBlank(r.getTarget().getId()) || - StringUtils.isNotBlank(r.getReltype().getName()) || - StringUtils.isNotBlank(r.getReltype().getType()); - } - - private static void dumpPidRelations(SparkSession spark, String inputPath, String outputPath) { - Dataset resultPids = Utils.readPath(spark, inputPath, ResultPidsList.class); - - distinctRelations(resultPids.flatMap((FlatMapFunction) r -> { - List ret = new ArrayList<>(); - List resPids = r.getResultAllowedPids(); - List> authPids = r.getAuthorAllowedPids(); - - for (int i = 0; i < resPids.size() - 1; i++) { - String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue(); - for (int j = i + 1; j < resPids.size(); j++) { - ret - .addAll( - Utils - .getRelationPair( - pid, resPids.get(j).getKey() + ":" + resPids.get(j).getValue(), - Constants.RESULT, Constants.RESULT, Constants.SIMILARITY, - Constants.RESPID_RESPID_RELATION, Constants.RESPID_RESPID_RELATION)); - } - } - - for (int i = 0; i < authPids.size() - 1; i++) { - for (int j = i + 1; j < authPids.size(); j++) { - ret.addAll(getAuthRelations(authPids.get(i), authPids.get(j))); - } - } - - for (int i = 0; i < resPids.size(); i++) { - String pid = resPids.get(i).getKey() + ":" + resPids.get(i).getValue(); - for (int j = 0; j < authPids.size(); j++) { - for (int k = 0; k < authPids.get(j).size(); k++) { - ret - .addAll( - Utils - .getRelationPair( - pid, - authPids.get(j).get(k).getKey() + ":" + authPids.get(j).get(k).getValue(), - Constants.RESULT, Constants.AUTHOR, Constants.AUTHORSHIP, - Constants.RES_AUTHOR_RELATION, Constants.AUTHOR_RES_RELATION)); - } - - } - } - return ret.iterator(); - }, Encoders.bean(Relation.class))) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath + "/relation"); - } - - private static List getAuthRelations(List a1, List a2) { - List ret = new ArrayList<>(); - if (a1.size() > 1) { - ret.addAll(getSameAs(a1)); - } - if (a2.size() > 1) { - ret.addAll(getSameAs(a2)); - } - for (int i = 0; i < a1.size(); i++) { - String pid = a1.get(i).getKey() + ":" + a1.get(i).getValue(); - for (int j = 0; j < a2.size(); j++) { - ret - .addAll( - Utils - .getRelationPair( - pid, a2.get(j).getKey() + ":" + a2.get(j).getValue(), - Constants.AUTHOR, Constants.AUTHOR, Constants.AUTHORSHIP, - Constants.AUTHOR_AUTHOR_RELATION, Constants.AUTHOR_AUTHOR_RELATION)); - } - } - - return ret; - } - - private static List getSameAs(List a1) { - List ret = new ArrayList<>(); - for (int i = 0; i < a1.size() - 1; i++) { - ret - .addAll( - Utils - .getRelationPair( - a1.get(i).getKey() + ":" + a1.get(i).getValue(), - a1.get(i + 1).getKey() + ":" + a1.get(i + 1).getValue(), - Constants.AUTHOR, Constants.AUTHOR, Constants.SIMILARITY, - Constants.SAME_AS, Constants.SAME_AS)); - } - return ret; - } -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java deleted file mode 100644 index 68adf8257..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/pid/SparkPrepareResultPids.java +++ /dev/null @@ -1,127 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.dump.pid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Result; - -public class SparkPrepareResultPids implements Serializable { - - private static final Logger log = LoggerFactory.getLogger(SparkPrepareResultPids.class); - - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkPrepareResultPids.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump_pid/input_parameters.json")); - - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); - - final List allowedResultPid = new Gson().fromJson(parser.get("allowedResultPids"), List.class); - final List allowedAuthorPid = new Gson().fromJson(parser.get("allowedAuthorPids"), List.class); - - final String resultType = resultClassName.substring(resultClassName.lastIndexOf(".") + 1).toLowerCase(); - log.info("resultType: {}", resultType); - - Class inputClazz = (Class) Class.forName(resultClassName); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - preparePidEntities( - spark, inputPath, outputPath + "/" + resultType, inputClazz, allowedResultPid, allowedAuthorPid); - - }); - - } - - private static void preparePidEntities(SparkSession spark, String inputPath, String outputPath, - Class inputClazz, List allowedResultPid, - List allowedAuthorPid) { - - Dataset result = Utils.readPath(spark, inputPath, inputClazz); - - result.map((MapFunction) res -> { - ResultPidsList ret = new ResultPidsList(); - ret.setResultId(res.getId()); - List pidList = new ArrayList<>(); - Optional - .ofNullable(res.getPid()) - .ifPresent(pids -> pids.forEach(pid -> { - if (allowedResultPid.contains(pid.getQualifier().getClassid().toLowerCase())) { - pidList.add(KeyValue.newInstance(pid.getQualifier().getClassid(), pid.getValue())); - } - })); - ret.setResultAllowedPids(pidList); - List> authorPidList = new ArrayList<>(); - Optional - .ofNullable(res.getAuthor()) - .ifPresent(authors -> authors.forEach(a -> { - Optional - .ofNullable(a.getPid()) - .ifPresent(pids -> pids.forEach(p -> { - List authorPids = new ArrayList<>(); - if (allowedAuthorPid.contains(p.getQualifier().getClassid().toLowerCase())) { - authorPids.add(KeyValue.newInstance(p.getQualifier().getClassid(), p.getValue())); - } - if (authorPids.size() > 0) { - authorPidList.add(authorPids); - } - })); - })); - ret.setAuthorAllowedPids(authorPidList); - - if (authorPidList.size() == 0 && pidList.size() == 0) { - return null; - } - return ret; - }, Encoders.bean(ResultPidsList.class)) - .filter(Objects::nonNull) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .json(outputPath); - } - -}