From f747e303acb3ffc6ae272a5285666ebde5e0690d Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Fri, 30 Oct 2020 14:13:45 +0100 Subject: [PATCH] classes for dumping of the graph as ttl file --- .../oa/graph/dump/ttl/OrganizationInfo.java | 60 ++++++++++++- .../dnetlib/dhp/oa/graph/dump/ttl/Pids.java | 23 ++++- .../ttl/SparkPrepareOrganizationInfo.java | 86 ++++++++++++++++++- 3 files changed, 166 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java index 31aa2b0a46..6e897d34f0 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/OrganizationInfo.java @@ -1,4 +1,62 @@ + package eu.dnetlib.dhp.oa.graph.dump.ttl; -public class OrganizationInfo { +import java.io.Serializable; +import java.util.List; + +public class OrganizationInfo implements Serializable { + private String name; + private String shortName; + private String country; + private String websiteUrl; + private List pidsList; + private String id; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getShortName() { + return shortName; + } + + public void setShortName(String shortName) { + this.shortName = shortName; + } + + public String getCountry() { + return country; + } + + public void setCountry(String country) { + this.country = country; + } + + public String getWebsiteUrl() { + return websiteUrl; + } + + public void setWebsiteUrl(String webciteUrl) { + this.websiteUrl = webciteUrl; + } + + public List getPidsList() { + return pidsList; + } + + public void setPidsList(List pidsList) { + this.pidsList = pidsList; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java index f540e161ed..26b0756239 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/Pids.java @@ -1,4 +1,25 @@ + package eu.dnetlib.dhp.oa.graph.dump.ttl; -public class Pids { +import java.io.Serializable; + +public class Pids implements Serializable { + private String type; + private String value; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java index 293d91671a..c64dfd4d42 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/ttl/SparkPrepareOrganizationInfo.java @@ -1,4 +1,88 @@ + package eu.dnetlib.dhp.oa.graph.dump.ttl; -public class SparkPrepareOrganizationInfo { +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.oaf.Organization; +import scala.Tuple2; + +public class SparkPrepareOrganizationInfo implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrganizationInfo.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkPrepareOrganizationInfo.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_organization_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkHiveSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + prepareOrganization(spark, inputPath, outputPath); + + }); + + } + + private static void prepareOrganization(SparkSession spark, String inputPath, String outputPath) { + Dataset orgs = Utils.readPath(spark, inputPath, Organization.class); + + orgs.createOrReplaceTempView("organization"); + + String query = "select country.classname country, legalname.value name, legalshortname.value shortName, websiteurl.value websiteUrl, " + + + "collect_set(named_struct(\"pidtype\", pIde.qualifier.classid, \"pid\", pIde.value)) as pidsList, id " + + "from organization " + + "lateral view explode( pid) p as pIde " + + "group by country.classname, legalname.value, legalshortname.value, websiteurl.value, id"; + + spark + .sql(query) + .as(Encoders.bean(OrganizationInfo.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + }