From 6d0f08277b3a643632e8f68da141575ec21a9f4e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 29 Jul 2020 17:03:19 +0200 Subject: [PATCH] classes to implement the dump of the whole graph. --- .../dhp/oa/graph/dump/graph/Constants.java | 30 + .../dhp/oa/graph/dump/graph/ContextInfo.java | 78 ++- .../dump/graph/CreateContextEntities.java | 127 ++-- .../dump/graph/CreateContextRelation.java | 110 +++- .../graph/dump/graph/DumpGraphEntities.java | 612 ++++++++++++------ .../dhp/oa/graph/dump/graph/Extractor.java | 207 ++++++ .../oa/graph/dump/graph/OrganizationMap.java | 2 +- .../dhp/oa/graph/dump/graph/Process.java | 71 ++ .../dump/graph/QueryInformationSystem.java | 110 +++- .../graph/dump/graph/SparkCollectAndSave.java | 83 +++ .../dump/graph/SparkDumpEntitiesJob.java | 103 ++- .../dump/graph/SparkDumpRelationJob.java | 106 ++- .../SparkExtractRelationFromEntities.java | 60 +- .../dump/graph/SparkOrganizationRelation.java | 145 ++++- 14 files changed, 1510 insertions(+), 334 deletions(-) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java new file mode 100644 index 000000000..c919ecd97 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java @@ -0,0 +1,30 @@ + +package eu.dnetlib.dhp.oa.graph.dump.graph; + +import java.io.Serializable; + +public class Constants implements Serializable { + // collectedFrom va con isProvidedBy -> becco da ModelSupport + + public static final String IS_HOSTED_BY = "isHostedBy"; + public static final String HOSTS = "hosts"; + + public static final String IS_FUNDED_BY = "isFundedBy"; + public static final String FUNDS = "funds"; + + public static final String FUNDINGS = "fundings"; + + // community result uso isrelatedto + + public static final String RESULT_ENTITY = "result"; + public static final String DATASOURCE_ENTITY = "datasource"; + public static final String CONTEXT_ENTITY = "context"; + public static final String ORGANIZATION_ENTITY = "organization"; + public static final String PROJECT_ENTITY = "project"; + + public static final String CONTEXT_ID = "00"; + public static final String CONTEXT_NS_PREFIX = "context____"; + + public static final String HARVESTED = "Harvested"; + public static final String DEFAULT_TRUST = "0.9"; +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/ContextInfo.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/ContextInfo.java index aaaf4bb86..041dbf5f7 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/ContextInfo.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/ContextInfo.java @@ -1,4 +1,80 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -public class ContextInfo { +import java.io.Serializable; +import java.util.List; + +public class ContextInfo implements Serializable { + private String id; + private String description; + private String type; + private String zenodocommunity; + private String name; + private List projectList; + private List datasourceList; + private List subject; + + public List getSubject() { + return subject; + } + + public void setSubject(List subject) { + this.subject = subject; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getZenodocommunity() { + return zenodocommunity; + } + + public void setZenodocommunity(String zenodocommunity) { + this.zenodocommunity = zenodocommunity; + } + + public List getProjectList() { + return projectList; + } + + public void setProjectList(List projectList) { + this.projectList = projectList; + } + + public List getDatasourceList() { + return datasourceList; + } + + public void setDatasourceList(List datasourceList) { + this.datasourceList = datasourceList; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java index 065f26476..80d47cc4e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextEntities.java @@ -1,77 +1,104 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; -import eu.dnetlib.dhp.oa.graph.dump.Utils; +import javax.rmi.CORBA.Util; -import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchCommunity; -import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Serializable; -import java.util.Optional; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchCommunity; +import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; -public class SparkCreateContextEntities implements Serializable { +public class CreateContextEntities implements Serializable { - //leggo i context dall'is e mi faccio la mappa id -> contextinfo + // leggo i context dall'is e mi faccio la mappa id -> contextinfo - //creo le entities con le info generali + // creo le entities con le info generali - //creo le relazioni con le info in projectList e datasourceList. Le relazioni sono di tipo isRelatedTo da una parte e dall'altra + private static final Logger log = LoggerFactory.getLogger(CreateContextEntities.class); + private final Configuration conf; + private final BufferedWriter writer; - //prendo un parametro community_organization per creare relazioni fra community ed organizzazioni . n.b. l'id dell'organizzazione - //e' non deduplicato => bisogna risolverlo e prendere i dedup id distinti + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CreateContextEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_entity_parameters.json")); - private static final Logger log = LoggerFactory.getLogger(SparkCreateContextEntities.class); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkCreateContextEntities.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final CreateContextEntities cce = new CreateContextEntities(hdfsPath, hdfsNameNode); - final String isLookUpUrl = parser.get("isLookUpUrl"); - log.info("isLookUpUrl: {}", isLookUpUrl); + log.info("Processing contexts..."); + cce.execute(Process::getEntity, isLookUpUrl); - QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); - queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); - CommunityMap communityMap = queryInformationSystem.getCommunityMap(); + } - createEntities(communityMap, outputPath + "/context"); + public CreateContextEntities(String hdfsPath, String hdfsNameNode) throws IOException { + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } - } + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); - private static void createEntities(CommunityMap communityMap, String s) { - communityMap.keySet().stream() - .map(key -> { - ResearchInitiative community; - ContextInfo cinfo = communityMap.get(key); - if(cinfo.getType().equals("community")){ - community = new ResearchCommunity(); - }else{ - community = new ResearchInitiative(); - } - return community; - }) - } + } + public void execute(final Function producer, String isLookUpUrl) + throws Exception { + + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + + final Consumer consumer = ci -> writeEntity(producer.apply(ci)); + + queryInformationSystem.getContextInformation(consumer); + } + + + + protected void writeEntity(final R r) { + try { + writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java index 8b45bd8e7..2067499b6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/CreateContextRelation.java @@ -1,4 +1,112 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -public class CreateContextRelation { +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; + +import eu.dnetlib.dhp.schema.oaf.Datasource; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.graph.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; + +public class CreateContextRelation implements Serializable { + private static final Logger log = LoggerFactory.getLogger(CreateContextEntities.class); + private final Configuration conf; + private final BufferedWriter writer; + private final QueryInformationSystem queryInformationSystem; + + private static final String CONTEX_RELATION_DATASOURCE = "contentproviders"; + private static final String CONTEX_RELATION_PROJECT = "projects"; + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + CreateContextRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_entity_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String hdfsPath = parser.get("hdfsPath"); + log.info("hdfsPath: {}", hdfsPath); + + final String hdfsNameNode = parser.get("nameNode"); + log.info("nameNode: {}", hdfsNameNode); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + final CreateContextRelation cce = new CreateContextRelation(hdfsPath, hdfsNameNode, isLookUpUrl); + + log.info("Creating relation for datasource..."); + cce.execute(Process::getRelation, CONTEX_RELATION_DATASOURCE, ModelSupport.getIdPrefix(Datasource.class)); + + log.info("Creating relations for projects... "); + cce.execute(Process::getRelation, CONTEX_RELATION_PROJECT, ModelSupport.getIdPrefix(eu.dnetlib.dhp.schema.oaf.Project.class)); + + } + + public CreateContextRelation(String hdfsPath, String hdfsNameNode, String isLookUpUrl) + throws IOException, ISLookUpException { + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + + queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + queryInformationSystem.execContextRelationQuery(); + + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + + } + + public void execute(final Function> producer, String category, String prefix) throws Exception { + + final Consumer consumer = ci -> producer.apply(ci).forEach(c -> writeEntity(c)); + + queryInformationSystem.getContextRelation(consumer, category, prefix); + } + + + protected void writeEntity(final Relation r) { + try { + writer.write(Utils.OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index bdddb807e..296d5e02a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -1,16 +1,18 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.dump.oaf.ControlledField; -import eu.dnetlib.dhp.schema.dump.oaf.Country; -import eu.dnetlib.dhp.schema.dump.oaf.KeyValue; -import eu.dnetlib.dhp.schema.dump.oaf.Qualifier; -import eu.dnetlib.dhp.schema.dump.oaf.graph.*; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import eu.dnetlib.dhp.schema.dump.oaf.*; +import eu.dnetlib.dhp.schema.oaf.Journal; import org.apache.spark.SparkConf; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -20,234 +22,452 @@ import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; -import java.io.Serializable; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.graph.*; +import eu.dnetlib.dhp.schema.oaf.Field; +import eu.dnetlib.dhp.schema.oaf.OafEntity; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import javax.swing.text.html.Option; -public class DumpGraph implements Serializable { +public class DumpGraphEntities implements Serializable { + + public void run(Boolean isSparkSessionManaged, + String inputPath, + String outputPath, + Class inputClazz, + CommunityMap communityMap) { + + SparkConf conf = new SparkConf(); + switch (ModelSupport.idPrefixMap.get(inputClazz)) { + case "50": + DumpProducts d = new DumpProducts(); + d.run(isSparkSessionManaged, inputPath, outputPath, communityMap, inputClazz, Result.class, true); + break; + case "40": + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + projectMap(spark, inputPath, outputPath, inputClazz); + + }); + break; + case "20": + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + organizationMap(spark, inputPath, outputPath, inputClazz); + + }); + break; + case "10": + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + datasourceMap(spark, inputPath, outputPath, inputClazz); + + }); + break; + } + + } + + private static void datasourceMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { + Utils + .readPath(spark, inputPath, inputClazz) + .map(d -> mapDatasource((eu.dnetlib.dhp.schema.oaf.Datasource) d), Encoders.bean(Datasource.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - public void run(Boolean isSparkSessionManaged, - String inputPath, - String outputPath, - Class inputClazz, - CommunityMap communityMap) { - SparkConf conf = new SparkConf(); - switch (ModelSupport.idPrefixMap.get(inputClazz)){ - case "50": - DumpProducts d = new DumpProducts(); - d.run(isSparkSessionManaged,inputPath,outputPath,communityMap, inputClazz, true); - break; - case "40": - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - projectMap(spark, inputPath, outputPath, inputClazz); + private static void projectMap(SparkSession spark, String inputPath, String outputPath, + Class inputClazz) { + Utils + .readPath(spark, inputPath, inputClazz) + .map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project) p), Encoders.bean(Project.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } - }); - break; - case "20": - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - Utils.removeOutputDir(spark, outputPath); - organizationMap(spark, inputPath, outputPath, inputClazz); + private static Datasource mapDatasource(eu.dnetlib.dhp.schema.oaf.Datasource d) { + Datasource datasource = new Datasource(); - }); - break; - } - } + datasource.setId(d.getId()); - private static void projectMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { - Utils.readPath(spark, inputPath, inputClazz) - .map(p -> mapProject((eu.dnetlib.dhp.schema.oaf.Project)p), Encoders.bean(Project.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath); - } + Optional.ofNullable(d.getOriginalId()).ifPresent(oId -> datasource.setOriginalId(oId)); - private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) { - Project project = new Project(); + Optional.ofNullable(d.getPid()) + .ifPresent(pids -> pids.stream().map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList())); - project.setCollectedfrom(Optional.ofNullable(p.getCollectedfrom()) - .map(cf -> cf.stream().map(coll -> coll.getValue()).collect(Collectors.toList())) - .orElse(new ArrayList<>())); + Optional.ofNullable(d.getDatasourcetype()) + .ifPresent(dsType -> datasource.setDatasourcetype(ControlledField.newInstance(dsType.getClassid(), dsType.getClassname()))); - Optional.ofNullable(p.getId()) - .ifPresent(id -> project.setId(id)); + Optional.ofNullable(d.getOpenairecompatibility()) + .ifPresent(v -> datasource.setOpenairecompatibility(v.getClassname())); - Optional.ofNullable(p.getWebsiteurl()) - .ifPresent(w -> project.setWebsiteurl(w.getValue())); + Optional.ofNullable(d.getOfficialname()) + .ifPresent(oname -> datasource.setOfficialname(oname.getValue())); - Optional.ofNullable(p.getCode()) - .ifPresent(code -> project.setCode(code.getValue())); + Optional.ofNullable(d.getEnglishname()) + .ifPresent(ename -> datasource.setEnglishname(ename.getValue())); - Optional.ofNullable(p.getAcronym()) - .ifPresent(acronynim -> project.setAcronym(acronynim.getValue())); + Optional.ofNullable(d.getWebsiteurl()) + .ifPresent(wsite -> datasource.setWebsiteurl(wsite.getValue())); - Optional.ofNullable(p.getTitle()) - .ifPresent(title -> project.setTitle(title.getValue())); + Optional.ofNullable(d.getLogourl()) + .ifPresent(lurl -> datasource.setLogourl(lurl.getValue())); - Optional.ofNullable(p.getStartdate()) - .ifPresent(sdate -> project.setStartdate(sdate.getValue())); + Optional.ofNullable(d.getDateofvalidation()) + .ifPresent(dval -> datasource.setDateofvalidation(dval.getValue())); - Optional.ofNullable(p.getEnddate()) - .ifPresent(edate -> project.setEnddate(edate.getValue())); + Optional.ofNullable(d.getDescription()) + .ifPresent(dex -> datasource.setDescription(dex.getValue())); - Optional.ofNullable(p.getCallidentifier()) - .ifPresent(cide -> project.setCallidentifier(cide.getValue())); + Optional.ofNullable(d.getSubjects()) + .ifPresent(sbjs -> datasource.setSubjects(sbjs.stream().map(sbj -> sbj.getValue()).collect(Collectors.toList()))); - Optional.ofNullable(p.getKeywords()) - .ifPresent(key -> project.setKeywords(key.getValue())); + Optional.ofNullable(d.getOdpolicies()) + .ifPresent(odp->datasource.setPolicies(Arrays.asList(odp.getValue()))); - Optional.ofNullable(p.getDuration()) - .ifPresent(duration -> project.setDuration(duration.getValue())); + Optional.ofNullable(d.getOdlanguages()) + .ifPresent(langs -> datasource.setLanguages(langs.stream().map(lang -> lang.getValue()).collect(Collectors.toList()))); - Optional> omandate = Optional.ofNullable(p.getOamandatepublications()); - Optional> oecsc39 = Optional.ofNullable(p.getEcsc39()); - boolean mandate = false; - if(omandate.isPresent()){ - if (!omandate.get().getValue().equals("N")){ - mandate = true; - } - } - if(oecsc39.isPresent()){ - if(!oecsc39.get().getValue().equals("N")){ - mandate = true; - } - } + Optional.ofNullable(d.getOdcontenttypes()) + .ifPresent(ctypes -> datasource.setContenttypes(ctypes.stream().map(ctype -> ctype.getValue()).collect(Collectors.toList()))); - project.setOpenaccessmandateforpublications(mandate); - project.setOpenaccessmandatefordataset(false); + Optional.ofNullable(d.getReleasestartdate()) + .ifPresent(rd -> datasource.setReleasestartdate(rd.getValue())); - Optional.ofNullable(p.getEcarticle29_3()) - .ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("Y"))); + Optional.ofNullable(d.getReleaseenddate()) + .ifPresent(ed -> datasource.setReleaseenddate(ed.getValue())); - project.setSubject(Optional.ofNullable(p.getSubjects()) - .map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList())) - .orElse(new ArrayList<>())); + Optional.ofNullable(d.getMissionstatementurl()) + .ifPresent(ms -> datasource.setMissionstatementurl(ms.getValue())); - Optional.ofNullable(p.getSummary()) - .ifPresent(summary -> project.setSummary(summary.getValue())); + Optional.ofNullable(d.getDatabaseaccesstype()) + .ifPresent(ar -> datasource.setAccessrights(ar.getValue())); - Optional ofundedamount = Optional.ofNullable(p.getFundedamount()); - Optional> ocurrency = Optional.ofNullable(p.getCurrency()); - Optional ototalcost = Optional.ofNullable(p.getTotalcost()); + Optional.ofNullable(d.getDatauploadtype()) + .ifPresent(dut -> datasource.setUploadrights(dut.getValue())); - if(ocurrency.isPresent()){ - if(ofundedamount.isPresent()){ - if(ototalcost.isPresent()){ - project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get())); - }else{ - project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get())); - } - } - } + Optional.ofNullable(d.getDatabaseaccessrestriction()) + .ifPresent(dar ->datasource.setDatabaseaccessrestriction(dar.getValue())); - project.setProgramme(Optional.ofNullable(p.getProgramme()) - .map(programme -> programme.stream().map(pg -> Programme.newInstance(pg.getCode(), pg.getDescription())) - .collect(Collectors.toList())) - .orElse(new ArrayList<>())); + Optional.ofNullable(d.getDatauploadrestriction()) + .ifPresent(dur -> datasource.setDatauploadrestriction(dur.getValue())); - project.setFunding(Optional.ofNullable(p.getFundingtree()) - .map(value -> value.stream() - .map(fundingtree -> getFunder(fundingtree.getValue())).collect(Collectors.toList())) - .orElse(new ArrayList<>())); - return project; - } + Optional.ofNullable(d.getVersioning()) + .ifPresent(v->datasource.setVersioning(v.getValue())); - public static Funder getFunder(String fundingtree){ + Optional.ofNullable(d.getCitationguidelineurl()) + .ifPresent(cu -> datasource.setCitationguidelineurl(cu.getValue())); - Funder f = new Funder(); - final Document doc; - try { - doc = new SAXReader().read(new StringReader(fundingtree)); - f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); - f.setName(((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText()); - f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); - f.setId(((org.dom4j.Node) (doc.selectNodes("//funder/id").get(0))).getText()); - List fundings = new ArrayList<>(); - int level = 0; - List nodes = doc.selectNodes("//funding_level_"+level); - while(nodes.size() > 0 ) { - for(org.dom4j.Node n: nodes) { - Levels funding_stream = new Levels(); - funding_stream.setLevel(String.valueOf(level)); - List node = n.selectNodes("./name"); - funding_stream.setName(((org.dom4j.Node)node.get(0)).getText()); - node = n.selectNodes("./id"); - funding_stream.setId(((org.dom4j.Node)node.get(0)).getText()); - node = n.selectNodes("./description"); - funding_stream.setDescription(((Node)node.get(0)).getText()); - fundings.add(funding_stream); + Optional.ofNullable(d.getPidsystems()) + .ifPresent(ps -> datasource.setPidsystems(ps.getValue())); - } - level += 1; - nodes = doc.selectNodes("//funding_level_"+level); - } - if(fundings.size() > 0 ) { - f.setFunding_levels(fundings); - } + Optional.ofNullable(d.getCertificates()) + .ifPresent(c -> datasource.setCertificates(c.getValue())); - return f; - } catch (DocumentException e) { - e.printStackTrace(); - } - return f; + Optional.ofNullable(d.getPolicies()) + .ifPresent(ps -> datasource.setPolicies(ps.stream().map(p -> p.getValue()).collect(Collectors.toList()))); - } + Optional.ofNullable(d.getJournal()) + .ifPresent(j -> datasource.setJournal(getContainer(j))); - private static void organizationMap(SparkSession spark, String inputPath, String outputPath, Class inputClazz) { - Utils.readPath(spark, inputPath,inputClazz) - .map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization)o), Encoders.bean(Organization.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression","gzip") - .json(outputPath); - } + return datasource; - private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org){ - Organization organization = new Organization(); + } - Optional.ofNullable(org.getLegalshortname()) - .ifPresent(value -> organization.setLegalshortname(value.getValue())); + private static Container getContainer(Journal j) { + Container c = new Container(); - Optional.ofNullable(org.getLegalname()) - .ifPresent(value -> organization.setLegalname(value.getValue())); + Optional.ofNullable(j.getName()) + .ifPresent(n->c.setName(n)); - Optional.ofNullable(org.getWebsiteurl()) - .ifPresent(value -> organization.setWebsiteurl(value.getValue())); + Optional.ofNullable(j.getIssnPrinted()) + .ifPresent(issnp -> c.setIssnPrinted(issnp)); - Optional.ofNullable(org.getAlternativeNames()) - .ifPresent(value -> organization.setAlternativenames(value.stream() - .map( v-> v.getValue()).collect(Collectors.toList()))); + Optional.ofNullable(j.getIssnOnline()) + .ifPresent(issno -> c.setIssnOnline(issno)); - Optional.ofNullable(org.getCountry()) - .ifPresent(value -> organization.setCountry(Qualifier.newInstance(value.getClassid(), value.getClassname()))); + Optional.ofNullable(j.getIssnLinking()) + .ifPresent(isnl -> c.setIssnLinking(isnl)); - Optional.ofNullable(org.getId()) - .ifPresent(value -> organization.setId(value)); + Optional.ofNullable(j.getEp()) + .ifPresent(ep -> c.setEp(ep)); - Optional.ofNullable(org.getPid()) - .ifPresent(value -> organization.setPid( - value.stream().map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())).collect(Collectors.toList()) - )); + Optional.ofNullable(j.getIss()) + .ifPresent(iss -> c.setIss(iss)); - organization.setCollectedfrom(Optional.ofNullable(org.getCollectedfrom()) - .map(value -> value.stream() - .map(cf -> KeyValue.newInstance(cf.getKey(),cf.getValue())).collect(Collectors.toList())) - .orElse(new ArrayList<>())); + Optional.ofNullable(j.getSp()) + .ifPresent(sp -> c.setSp(sp)); - return organization; - } + Optional.ofNullable(j.getVol()) + .ifPresent(vol -> c.setVol(vol)); + + Optional.ofNullable(j.getEdition()) + .ifPresent(edition -> c.setEdition(edition)); + + Optional.ofNullable(j.getConferencedate()) + .ifPresent(cdate -> c.setConferencedate(cdate)); + + Optional.ofNullable(j.getConferenceplace()) + .ifPresent(cplace -> c.setConferenceplace(cplace)); + + return c; + } + + private static Project mapProject(eu.dnetlib.dhp.schema.oaf.Project p) { + Project project = new Project(); + +// project +// .setCollectedfrom( +// Optional +// .ofNullable(p.getCollectedfrom()) +// .map( +// cf -> cf +// .stream() +// .map(coll -> KeyValue.newInstance(coll.getKey(), coll.getValue())) +// .collect(Collectors.toList())) +// .orElse(new ArrayList<>())); + + Optional + .ofNullable(p.getId()) + .ifPresent(id -> project.setId(id)); + + Optional + .ofNullable(p.getWebsiteurl()) + .ifPresent(w -> project.setWebsiteurl(w.getValue())); + + Optional + .ofNullable(p.getCode()) + .ifPresent(code -> project.setCode(code.getValue())); + + Optional + .ofNullable(p.getAcronym()) + .ifPresent(acronynim -> project.setAcronym(acronynim.getValue())); + + Optional + .ofNullable(p.getTitle()) + .ifPresent(title -> project.setTitle(title.getValue())); + + Optional + .ofNullable(p.getStartdate()) + .ifPresent(sdate -> project.setStartdate(sdate.getValue())); + + Optional + .ofNullable(p.getEnddate()) + .ifPresent(edate -> project.setEnddate(edate.getValue())); + + Optional + .ofNullable(p.getCallidentifier()) + .ifPresent(cide -> project.setCallidentifier(cide.getValue())); + + Optional + .ofNullable(p.getKeywords()) + .ifPresent(key -> project.setKeywords(key.getValue())); + + Optional + .ofNullable(p.getDuration()) + .ifPresent(duration -> project.setDuration(duration.getValue())); + + Optional> omandate = Optional.ofNullable(p.getOamandatepublications()); + Optional> oecsc39 = Optional.ofNullable(p.getEcsc39()); + boolean mandate = false; + if (omandate.isPresent()) { + if (omandate.get().getValue().equals("true")) { + mandate = true; + } + } + if (oecsc39.isPresent()) { + if (oecsc39.get().getValue().equals("true")) { + mandate = true; + } + } + + project.setOpenaccessmandateforpublications(mandate); + project.setOpenaccessmandatefordataset(false); + + Optional + .ofNullable(p.getEcarticle29_3()) + .ifPresent(oamandate -> project.setOpenaccessmandatefordataset(oamandate.getValue().equals("true"))); + + project + .setSubject( + Optional + .ofNullable(p.getSubjects()) + .map(subjs -> subjs.stream().map(s -> s.getValue()).collect(Collectors.toList())) + .orElse(new ArrayList<>())); + + Optional + .ofNullable(p.getSummary()) + .ifPresent(summary -> project.setSummary(summary.getValue())); + + Optional ofundedamount = Optional.ofNullable(p.getFundedamount()); + Optional> ocurrency = Optional.ofNullable(p.getCurrency()); + Optional ototalcost = Optional.ofNullable(p.getTotalcost()); + + if (ocurrency.isPresent()) { + if (ofundedamount.isPresent()) { + if (ototalcost.isPresent()) { + project + .setGranted( + Granted.newInstance(ocurrency.get().getValue(), ototalcost.get(), ofundedamount.get())); + } else { + project.setGranted(Granted.newInstance(ocurrency.get().getValue(), ofundedamount.get())); + } + } + } + + project + .setProgramme( + Optional + .ofNullable(p.getProgramme()) + .map( + programme -> programme + .stream() + .map(pg -> Programme.newInstance(pg.getCode(), pg.getDescription())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + + project + .setFunding( + Optional + .ofNullable(p.getFundingtree()) + .map( + value -> value + .stream() + .map(fundingtree -> getFunder(fundingtree.getValue())) + .collect(Collectors.toList())) + .orElse(new ArrayList<>())); + return project; + } + + public static Funder getFunder(String fundingtree) { + + Funder f = new Funder(); + final Document doc; + try { + doc = new SAXReader().read(new StringReader(fundingtree)); + f.setShortName(((org.dom4j.Node) (doc.selectNodes("//funder/shortname").get(0))).getText()); + f.setName(((org.dom4j.Node) (doc.selectNodes("//funder/name").get(0))).getText()); + f.setJurisdiction(((org.dom4j.Node) (doc.selectNodes("//funder/jurisdiction").get(0))).getText()); + // f.setId(((org.dom4j.Node) (doc.selectNodes("//funder/id").get(0))).getText()); + + String id = ""; + String description = ""; + // List fundings = new ArrayList<>(); + int level = 0; + List nodes = doc.selectNodes("//funding_level_" + level); + while (nodes.size() > 0) { + for (org.dom4j.Node n : nodes) { +// Levels funding_stream = new Levels(); +// funding_stream.setLevel(String.valueOf(level)); +// List node = n.selectNodes("./name"); + // funding_stream.setName(((org.dom4j.Node)node.get(0)).getText()); + List node = n.selectNodes("./id"); + id = ((org.dom4j.Node) node.get(0)).getText(); + id = id.substring(id.indexOf("::") + 2); + // funding_stream.setId(((org.dom4j.Node)node.get(0)).getText()); + node = n.selectNodes("./description"); + description += ((Node) node.get(0)).getText() + " - "; +// funding_stream.setDescription(((Node)node.get(0)).getText()); +// fundings.add(funding_stream); + + } + level += 1; + nodes = doc.selectNodes("//funding_level_" + level); + } +// if(fundings.size() > 0 ) { +// f.setFunding_levels(fundings); +// } + if (!id.equals("")) { + Fundings fundings = new Fundings(); + fundings.setId(id); + fundings.setDescription(description.substring(0, description.length() - 3).trim()); + f.setFunding_stream(fundings); + } + + return f; + } catch (DocumentException e) { + e.printStackTrace(); + } + return f; + + } + + private static void organizationMap(SparkSession spark, String inputPath, String outputPath, + Class inputClazz) { + Utils + .readPath(spark, inputPath, inputClazz) + .map(o -> mapOrganization((eu.dnetlib.dhp.schema.oaf.Organization) o), Encoders.bean(Organization.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + private static Organization mapOrganization(eu.dnetlib.dhp.schema.oaf.Organization org) { + Organization organization = new Organization(); + + Optional + .ofNullable(org.getLegalshortname()) + .ifPresent(value -> organization.setLegalshortname(value.getValue())); + + Optional + .ofNullable(org.getLegalname()) + .ifPresent(value -> organization.setLegalname(value.getValue())); + + Optional + .ofNullable(org.getWebsiteurl()) + .ifPresent(value -> organization.setWebsiteurl(value.getValue())); + + Optional + .ofNullable(org.getAlternativeNames()) + .ifPresent( + value -> organization + .setAlternativenames( + value + .stream() + .map(v -> v.getValue()) + .collect(Collectors.toList()))); + + Optional + .ofNullable(org.getCountry()) + .ifPresent( + value -> organization.setCountry(Qualifier.newInstance(value.getClassid(), value.getClassname()))); + + Optional + .ofNullable(org.getId()) + .ifPresent(value -> organization.setId(value)); + + Optional + .ofNullable(org.getPid()) + .ifPresent( + value -> organization + .setPid( + value + .stream() + .map(p -> ControlledField.newInstance(p.getQualifier().getClassid(), p.getValue())) + .collect(Collectors.toList()))); + + + + return organization; + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java new file mode 100644 index 000000000..ae77f7dfc --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java @@ -0,0 +1,207 @@ +package eu.dnetlib.dhp.oa.graph.dump.graph; + +import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; +import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.oaf.Project; +import eu.dnetlib.dhp.schema.oaf.Result; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; +import java.util.*; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class Extractor implements Serializable { + + public void run(Boolean isSparkSessionManaged, + String inputPath, + String outputPath, + Class inputClazz, + CommunityMap communityMap) { + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + extractRelationResult(spark, inputPath, outputPath, inputClazz, communityMap); + }); + } + + + +// private static void extractRelationProjects(SparkSession spark, String inputPath, String outputPath){ +// Utils.readPath(spark, inputPath, Project.class) +// .flatMap((FlatMapFunction) project ->{ +// List relList = new ArrayList<>(); +// Optional.ofNullable(project.getCollectedfrom()) +// .ifPresent(cfl -> +// cfl.forEach(cf -> { +// Provenance provenance = Provenance.newInstance(cf.getDataInfo().getProvenanceaction().getClassname(), +// cf.getDataInfo().getTrust()); +// +// relList.add(getRelation(project.getId(), cf.getKey(), +// Constants.PROJECT_ENTITY, Constants.DATASOURCE_ENTITY, Constants.IS_FUNDED_BY, +// Constants.FUNDINGS, provenance)); +// relList.add(getRelation(cf.getKey(), project.getId(), +// Constants.DATASOURCE_ENTITY, Constants.PROJECT_ENTITY, Constants.FUNDS, +// Constants.FUNDINGS, provenance)); +// })); +// return relList.iterator(); +// }, Encoders.bean(Relation.class)) +// .write() +// .option("Compression", "gzip") +// .mode(SaveMode.Append) +// .json(outputPath); +// } + + + private void extractRelationResult(SparkSession spark, + String inputPath, + String outputPath, + Class inputClazz, + CommunityMap communityMap) { + + Set hashCodes = new HashSet<>(); + + Utils + .readPath(spark, inputPath, inputClazz) + .flatMap((FlatMapFunction) value -> { + List relationList = new ArrayList<>(); + Optional + .ofNullable(value.getInstance()) + .ifPresent(inst -> inst.forEach(instance -> { + Optional + .ofNullable(instance.getCollectedfrom()) + .ifPresent(cf -> + getRelatioPair(value, relationList, cf, + ModelConstants.IS_PROVIDED_BY, ModelConstants.PROVIDES, hashCodes) + ); + Optional + .ofNullable(instance.getHostedby()) + .ifPresent(hb -> getRelatioPair(value, relationList, hb, + Constants.IS_HOSTED_BY, Constants.HOSTS , hashCodes) ); + })); + Set communities = communityMap.keySet(); + Optional + .ofNullable(value.getContext()) + .ifPresent(contexts -> contexts.forEach(context -> { + String id = context.getId(); + if (id.contains(":")) { + id = id.substring(0, id.indexOf(":")); + } + if (communities.contains(id)) { + String contextId = Utils.getContextId(id); + Provenance provenance = Optional + .ofNullable(context.getDataInfo()) + .map( + dinfo -> Optional + .ofNullable(dinfo.get(0).getProvenanceaction()) + .map( + paction -> Provenance + .newInstance( + paction.getClassid(), + dinfo.get(0).getTrust())) + .orElse(null)) + .orElse(null); + Relation r = getRelation( + value.getId(), contextId, + Constants.RESULT_ENTITY, + Constants.CONTEXT_ENTITY, + ModelConstants.RELATIONSHIP, ModelConstants.IS_RELATED_TO, provenance); + if(!hashCodes.contains(r.hashCode())){ + relationList + .add(r); + hashCodes.add(r.hashCode()); + } + r = getRelation( + contextId, value.getId(), + Constants.CONTEXT_ENTITY, + Constants.RESULT_ENTITY, + ModelConstants.RELATIONSHIP, + ModelConstants.IS_RELATED_TO, provenance); + if(!hashCodes.contains(r.hashCode())){ + relationList + .add( + r); + hashCodes.add(r.hashCode()); + } + + } + + })); + + return relationList.iterator(); + }, Encoders.bean(Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + + private static void getRelatioPair(R value, List relationList, KeyValue cf, + String result_dtasource, String datasource_result, + Set hashCodes) { + Provenance provenance = Optional + .ofNullable(cf.getDataInfo()) + .map( + dinfo -> Optional + .ofNullable(dinfo.getProvenanceaction()) + .map( + paction -> Provenance + .newInstance( + paction.getClassid(), + dinfo.getTrust())) + .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST))) + .orElse(Provenance.newInstance(Constants.HARVESTED, Constants.DEFAULT_TRUST)); + Relation r = getRelation( + value.getId(), + cf.getKey(), Constants.RESULT_ENTITY, Constants.DATASOURCE_ENTITY, + result_dtasource, ModelConstants.PROVISION, + provenance); + if(!hashCodes.contains(r.hashCode())){ + relationList + .add(r); + hashCodes.add(r.hashCode()); + } + + r = getRelation( + cf.getKey(), value.getId(), + Constants.DATASOURCE_ENTITY, Constants.RESULT_ENTITY, + datasource_result, ModelConstants.PROVISION, + provenance); + + if(!hashCodes.contains(r.hashCode())){ + relationList + .add(r); + hashCodes.add(r.hashCode()); + } + + } + + private static Relation getRelation(String source, String target, String sourceType, String targetType, + String relName, String relType, Provenance provenance) { + Relation r = new Relation(); + r.setSource(Node.newInstance(source, sourceType)); + r.setTarget(Node.newInstance(target, targetType)); + r.setReltype(RelType.newInstance(relName, relType)); + r.setProvenance(provenance); + return r; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMap.java index 7d786058a..11db7c25e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/OrganizationMap.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.resulttocommunityfromorganization; +package eu.dnetlib.dhp.oa.graph.dump.graph; import java.util.ArrayList; import java.util.HashMap; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java new file mode 100644 index 000000000..e4ce93673 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java @@ -0,0 +1,71 @@ +package eu.dnetlib.dhp.oa.graph.dump.graph; + +import eu.dnetlib.dhp.oa.graph.dump.Constants; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.graph.*; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class Process implements Serializable { + + public static R getEntity(ContextInfo ci) { + try { + ResearchInitiative ri; + if (ci.getType().equals("community")) { + ri = new ResearchCommunity(); + ((ResearchCommunity) ri).setSubject(ci.getSubject()); + ri.setType(Constants.RESEARCH_COMMUNITY); + } else { + ri = new ResearchInitiative(); + ri.setType(Constants.RESEARCH_INFRASTRUCTURE); + } + ri.setId(Utils.getContextId(ci.getId())); + ri.setOriginalId(ci.getId()); + + ri.setDescription(ci.getDescription()); + ri.setName(ci.getName()); + ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); + return (R) ri; + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + public static List getRelation(ContextInfo ci) { + try { + + List relationList = new ArrayList<>(); + ci + .getDatasourceList() + .forEach(ds -> { + Relation direct = new Relation(); + Relation inverse = new Relation(); + String nodeType = ModelSupport.idPrefixEntity.get(ds.substring(0, 2)); + direct.setSource(Node.newInstance(Utils.getContextId(ci.getId()), "context")); + direct.setTarget(Node.newInstance(ds, nodeType)); + direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + direct.setProvenance(Provenance.newInstance("Harvested", "09")); + relationList.add(direct); + + inverse.setTarget(Node.newInstance(Utils.getContextId(ci.getId()), "context")); + inverse.setSource(Node.newInstance(ds, nodeType)); + inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + inverse.setProvenance(Provenance.newInstance("Harvested", "09")); + relationList.add(inverse); + + }); + + return relationList; + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java index 8bd5173b8..43c6160fb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/QueryInformationSystem.java @@ -1,37 +1,73 @@ -package eu.dnetlib.dhp.oa.graph.dump; +package eu.dnetlib.dhp.oa.graph.dump.graph; import java.io.StringReader; -import java.util.List; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.common.ModelSupport; import org.dom4j.Document; import org.dom4j.DocumentException; import org.dom4j.Element; +import org.dom4j.Node; import org.dom4j.io.SAXReader; +import org.jetbrains.annotations.NotNull; +import eu.dnetlib.dhp.schema.dump.oaf.graph.ResearchInitiative; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; public class QueryInformationSystem { private ISLookUpService isLookUp; + private List contextRelationResult; private static final String XQUERY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + " where $x//CONFIGURATION/context[./@type='community' or ./@type='ri'] " + - " and ($x//context/param[./@name = 'status']/text() = 'manager' or $x//context/param[./@name = 'status']/text() = 'all') " - + + " and $x//context/param[./@name = 'status']/text() = 'all' " + " return " + - " " + - "{$x//CONFIGURATION/context/@id}" + - "{$x//CONFIGURATION/context/@label}" + - ""; + "$x//context"; - public CommunityMap getCommunityMap() - throws ISLookUpException { - return getMap(isLookUp.quickSearchProfile(XQUERY)); + private static final String XQUERY_ENTITY = "for $x in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType') " + + + "where $x//context[./@type='community' or ./@type = 'ri'] and $x//context/param[./@name = 'status']/text() = 'all' return " + + + "concat(data($x//context/@id) , '@@', $x//context/param[./@name =\"name\"]/text(), '@@', " + + "$x//context/param[./@name=\"description\"]/text(), '@@', $x//context/param[./@name = \"subject\"]/text(), '@@', " + + + "$x//context/param[./@name = \"zenodoCommunity\"]/text(), '@@', $x//context/@type)"; + public void getContextInformation(final Consumer consumer) throws ISLookUpException { + + isLookUp + .quickSearchProfile(XQUERY_ENTITY) + .forEach(c -> { + ContextInfo cinfo = new ContextInfo(); + String[] cSplit = c.split("@@"); + cinfo.setId(cSplit[0]); + cinfo.setName(cSplit[1]); + cinfo.setDescription(cSplit[2]); + if (!cSplit[3].trim().equals("")){ + cinfo.setSubject(Arrays.asList(cSplit[3].split(","))); + } + cinfo.setZenodocommunity(cSplit[4]); + cinfo.setType(cSplit[5]); + consumer.accept(cinfo); + }); + + } + + public List getContextRelationResult() { + return contextRelationResult; + } + + public void setContextRelationResult(List contextRelationResult) { + this.contextRelationResult = contextRelationResult; } public ISLookUpService getIsLookUp() { @@ -42,22 +78,62 @@ public class QueryInformationSystem { this.isLookUp = isLookUpService; } - private CommunityMap getMap(List communityMap) { - final CommunityMap map = new CommunityMap(); + public void execContextRelationQuery() throws ISLookUpException { + contextRelationResult = isLookUp.quickSearchProfile(XQUERY); - communityMap.stream().forEach(xml -> { + } + + public void getContextRelation(final Consumer consumer, String category, String prefix) { + + contextRelationResult.forEach(xml -> { + ContextInfo cinfo = new ContextInfo(); final Document doc; + try { + doc = new SAXReader().read(new StringReader(xml)); Element root = doc.getRootElement(); - map.put(root.attribute("id").getValue(), root.attribute("label").getValue()); + cinfo.setId(root.attributeValue("id")); + + Iterator it = root.elementIterator(); + while (it.hasNext()) { + Element el = (Element) it.next(); + if(el.getName().equals("category")){ + String categoryId = el.attributeValue("id"); + categoryId = categoryId.substring(categoryId.lastIndexOf("::") + 2); + if (categoryId.equals(category)) { + cinfo.setDatasourceList(getCategoryList(el, prefix)); + } + } + + } + consumer.accept(cinfo); } catch (DocumentException e) { e.printStackTrace(); } }); - return map; + } + + @NotNull + private List getCategoryList(Element el, String prefix) { + List datasourceList = new ArrayList<>(); + for(Object node : el.selectNodes(".//param")){ + Node n = (Node)node; + if(n.valueOf("./@name").equals("openaireId")){ + datasourceList.add(prefix + "|" + n.getText()); + } + } + +// cat_iterator = el.elementIterator(); +// while (cat_iterator.hasNext()) { +// Element catEl = (Element) cat_iterator.next(); +// if (catEl.getName().equals("param") && catEl.attribute("name").getValue().equals("openaireId")) { +// datasourceList.add(catEl.getText()); +// } +// } + return datasourceList; } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java new file mode 100644 index 000000000..c1d500e76 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java @@ -0,0 +1,83 @@ +package eu.dnetlib.dhp.oa.graph.dump.graph; + + +import java.io.Serializable; +import java.util.Optional; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkCollectAndSave implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkCollectAndSave.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkCollectAndSave.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath + "/result"); + run(spark, inputPath, outputPath); + + }); + + + } + + private static void run(SparkSession spark, String inputPath, String outputPath) { + Utils.readPath(spark, inputPath + "/result/publication", Result.class) + .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct" , Result.class)) + .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + + Utils.readPath(spark, inputPath +"/relation/publication", Relation.class) + .union(Utils.readPath(spark, inputPath + "/relation/dataset", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/orp", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/software", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/contextOrg", Relation.class)) + .union(Utils.readPath(spark, inputPath + "/relation/context", Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(outputPath + "/relation"); + + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java index 01f576407..a09d5eb84 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpEntitiesJob.java @@ -1,84 +1,59 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.oa.graph.dump.DumpProducts; -import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; -import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; -import eu.dnetlib.dhp.oa.graph.dump.community.SparkDumpCommunityProducts; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.dump.oaf.ControlledField; -import eu.dnetlib.dhp.schema.dump.oaf.Country; -import eu.dnetlib.dhp.schema.dump.oaf.graph.*; -import eu.dnetlib.dhp.schema.oaf.Field; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import org.apache.avro.generic.GenericData; +import java.io.Serializable; +import java.util.Optional; + import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.dom4j.Document; -import org.dom4j.DocumentException; -import org.dom4j.Node; -import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.swing.text.html.Option; -import javax.xml.bind.annotation.adapters.CollapsedStringAdapter; -import java.io.Serializable; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.oaf.OafEntity; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +public class SparkDumpEntitiesJob implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkDumpEntitiesJob.class); -public class SparkDumpJob implements Serializable { - private static final Logger log = LoggerFactory.getLogger(SparkDumpCommunityProducts.class); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpEntitiesJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_parameters.json")); - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - SparkDumpJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); - final String inputPath = parser.get("sourcePath"); - log.info("inputPath: {}", inputPath); + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); - final String resultClassName = parser.get("resultTableName"); - log.info("resultTableName: {}", resultClassName); + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); - final String isLookUpUrl = parser.get("isLookUpUrl"); - log.info("isLookUpUrl: {}", isLookUpUrl); + Class inputClazz = (Class) Class.forName(resultClassName); - Class inputClazz = (Class) Class.forName(resultClassName); - - QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); - queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); - CommunityMap communityMap = queryInformationSystem.getCommunityMap(); - - DumpGraph dg = new DumpGraph(); - dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); - - - } + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + CommunityMap communityMap = queryInformationSystem.getCommunityMap(); + DumpGraphEntities dg = new DumpGraphEntities(); + dg.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); + } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java index 774f5dd7a..bb9a28580 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkDumpRelationJob.java @@ -1,4 +1,108 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -public class SparkDumpRelationJob { +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.io.Serializable; +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; +import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SparkDumpRelationJob implements Serializable { + + private static final Logger log = LoggerFactory.getLogger(SparkDumpRelationJob.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkDumpEntitiesJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_relationdump_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + dumpRelation(spark, inputPath, outputPath); + + }); + + } + + private static void dumpRelation(SparkSession spark, String inputPath, String outputPath) { + Utils + .readPath(spark, inputPath, Relation.class) + .map(relation -> { + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation rel = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); + rel + .setSource( + Node + .newInstance( + relation.getSource(), + ModelSupport.idPrefixEntity.get(relation.getSource().substring(0, 2)))); + + rel + .setTarget( + Node + .newInstance( + relation.getTarget(), + ModelSupport.idPrefixEntity.get(relation.getTarget().substring(0, 2)))); + + rel + .setReltype( + RelType + .newInstance( + relation.getRelClass(), + relation.getSubRelType())); + + Optional + .ofNullable(relation.getDataInfo()) + .ifPresent( + datainfo -> rel + .setProvenance( + Provenance + .newInstance(datainfo.getProvenanceaction().getClassname(), datainfo.getTrust()))); + + return rel; + + }, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) + .write() + .option("compression", "gzip") + .mode(SaveMode.Overwrite) + .json(outputPath); + + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java index 91fc8d4ed..14b5732f5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkExtractRelationFromEntities.java @@ -1,4 +1,62 @@ + package eu.dnetlib.dhp.oa.graph.dump.graph; -public class SparkExtractRelationFromEntities { + +import java.io.Serializable; +import java.util.*; +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.oa.graph.dump.QueryInformationSystem; +import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; +import eu.dnetlib.dhp.schema.oaf.Result; + + +public class SparkExtractRelationFromEntities implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkExtractRelationFromEntities.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkExtractRelationFromEntities.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + final String resultClassName = parser.get("resultTableName"); + log.info("resultTableName: {}", resultClassName); + + final String isLookUpUrl = parser.get("isLookUpUrl"); + log.info("isLookUpUrl: {}", isLookUpUrl); + + Class inputClazz = (Class) Class.forName(resultClassName); + + QueryInformationSystem queryInformationSystem = new QueryInformationSystem(); + queryInformationSystem.setIsLookUp(Utils.getIsLookUpService(isLookUpUrl)); + CommunityMap communityMap = queryInformationSystem.getCommunityMap(); + + Extractor extractor = new Extractor(); + extractor.run(isSparkSessionManaged, inputPath, outputPath, inputClazz, communityMap); + + + } + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java index 9b8d48bf4..e018fccdc 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java @@ -1,4 +1,145 @@ package eu.dnetlib.dhp.oa.graph.dump.graph; -public class SparkOrganizationRelation { -} +import com.google.gson.Gson; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; + +import eu.dnetlib.dhp.oa.graph.dump.Utils; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.dump.oaf.Provenance; +import eu.dnetlib.dhp.schema.dump.oaf.graph.Node; +import eu.dnetlib.dhp.schema.dump.oaf.graph.RelType; +import eu.dnetlib.dhp.schema.oaf.DataInfo; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +import org.apache.avro.generic.GenericData; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.SparkConf; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.ForeachFunction; + +import org.apache.spark.sql.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +public class SparkOrganizationRelation implements Serializable { + private static final Logger log = LoggerFactory.getLogger(SparkOrganizationRelation.class); + + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + SparkOrganizationRelation.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/graph/dump_whole/input_organization_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final String inputPath = parser.get("sourcePath"); + log.info("inputPath: {}", inputPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); + + + final OrganizationMap organizationMap = new Gson().fromJson(parser.get("organizationCommunityMap"), OrganizationMap.class); + log.info("organization map : {}", new Gson().toJson(organizationMap)); + + SparkConf conf = new SparkConf(); + AtomicReference> relationSet = null; + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + Utils.removeOutputDir(spark, outputPath); + writeRelations(spark, extractRelation(spark, inputPath, organizationMap), outputPath, organizationMap); + + }); + + } + + private static void writeRelations(SparkSession spark, Set rels, String outputPath, OrganizationMap organizationMap) { + + List relList = new ArrayList<>(); + + rels.forEach(oId -> { + organizationMap.get(oId).forEach(community -> { + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation direct = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); + eu.dnetlib.dhp.schema.dump.oaf.graph.Relation inverse = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); + String id = Utils.getContextId(community); + direct.setSource(Node.newInstance(id, "context")); + direct.setTarget(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); + direct.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + direct.setProvenance(Provenance.newInstance("Harvested", "0.9")); + relList.add(direct); + inverse.setTarget(Node.newInstance(id, "context")); + inverse.setSource(Node.newInstance(oId, ModelSupport.idPrefixEntity.get(oId.substring(0, 2)))); + inverse.setReltype(RelType.newInstance(ModelConstants.IS_RELATED_TO, ModelConstants.RELATIONSHIP)); + inverse.setProvenance(Provenance.newInstance("Harvested", "0.9")); + relList.add(inverse); + + }); + + }); + + spark.createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath); + } + + + + private static Set extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap) { + Dataset tmp = Utils.readPath(spark, inputPath, Relation.class); + Set organizationSet = organizationMap.keySet(); + Set toCreateRels = new HashSet<>(); + + tmp.foreach((ForeachFunction) relation -> { + Optional odInfo = Optional.ofNullable(relation.getDataInfo()); + if (odInfo.isPresent()) { + if (!odInfo.get().getDeletedbyinference()) { + if(relation.getRelClass().equals(ModelConstants.MERGES)){ + String oId = relation.getTarget(); + if (organizationSet.contains(oId)) { + organizationSet.remove(oId); + toCreateRels.add(relation.getSource()); + } + } + } + }}); + + toCreateRels.addAll(organizationSet); + return toCreateRels; + + } + + + + } \ No newline at end of file