From 1f893e63dc3cc48bf180fd311ba91503e27fb15e Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 14 Sep 2020 14:33:10 +0200 Subject: [PATCH] - --- .../dhp/schema/dump/pidgraph/Entity.java | 23 ++++++++++++++ .../dhp/oa/graph/dump/DumpProducts.java | 2 +- .../dhp/oa/graph/dump/graph/Constants.java | 1 + .../graph/dump/graph/DumpGraphEntities.java | 7 ++++- .../dhp/oa/graph/dump/graph/Extractor.java | 3 +- .../dhp/oa/graph/dump/graph/Process.java | 5 +++- .../graph/dump/graph/SparkCollectAndSave.java | 20 ++++++------- .../dump/graph/SparkOrganizationRelation.java | 30 +++++++++++++++---- .../communitymapservices.json | 1 + 9 files changed, 72 insertions(+), 20 deletions(-) create mode 100644 dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/pidgraph/Entity.java create mode 100644 dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymapservices.json diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/pidgraph/Entity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/pidgraph/Entity.java new file mode 100644 index 000000000..52a7a93bc --- /dev/null +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/dump/pidgraph/Entity.java @@ -0,0 +1,23 @@ + +package eu.dnetlib.dhp.schema.dump.pidgraph; + +import java.io.Serializable; + +public class Entity implements Serializable { + private String id; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public static Entity newInstance(String id) { + Entity entity = new Entity(); + entity.id = id; + + return entity; + } +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java index 5333d3d86..1279ede53 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/DumpProducts.java @@ -59,7 +59,7 @@ public class DumpProducts implements Serializable { Utils .readPath(spark, inputPath, inputClazz) - .map((MapFunction) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) + .map((MapFunction) value -> execMap(value, communityMap, graph), Encoders.bean(outputClazz)) .filter(Objects::nonNull) .write() .mode(SaveMode.Overwrite) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java index 4c1e1c08c..50e61f7cf 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Constants.java @@ -21,6 +21,7 @@ public class Constants implements Serializable { public static final String CONTEXT_ID = "00"; public static final String CONTEXT_NS_PREFIX = "context_____"; + public static final String UNKNOWN = "UNKNOWN"; // public static final String FUNDER_DS = "entityregistry::projects"; } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java index 38e867b97..24b3ad751 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/DumpGraphEntities.java @@ -484,7 +484,12 @@ public class DumpGraphEntities implements Serializable { Optional .ofNullable(org.getCountry()) .ifPresent( - value -> organization.setCountry(Qualifier.newInstance(value.getClassid(), value.getClassname()))); + value -> { + if (!value.getClassid().equals(Constants.UNKNOWN)) { + organization.setCountry(Qualifier.newInstance(value.getClassid(), value.getClassname())); + } + + }); Optional .ofNullable(org.getId()) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java index 73957e41d..e0556ff72 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Extractor.java @@ -27,7 +27,8 @@ import eu.dnetlib.dhp.schema.oaf.Result; * new Relations are created for the datasource in the collectedfrom and hostedby elements and for the context related * to communities and research initiative/infrastructures. For collectedfrom elements it creates: datasource -> provides * -> result and result -> isProvidedBy -> datasource For hostedby elements it creates: datasource -> hosts -> result - * and result -> isHostedBy -> datasource For context elements it creates: context <-> isRelatedTo <-> result + * and result -> isHostedBy -> datasource For context elements it creates: context <-> isRelatedTo <-> result. Note for + * context: it gets the first provenance in the dataInfo. If more than one is present the others are not dumped */ public class Extractor implements Serializable { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java index 5fc8fad25..465254a1a 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/Process.java @@ -5,6 +5,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,9 @@ public class Process implements Serializable { ri.setDescription(ci.getDescription()); ri.setName(ci.getName()); - ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); + if (StringUtils.isNotEmpty(ci.getZenodocommunity())) { + ri.setZenodo_community(Constants.ZENODO_COMMUNITY_PREFIX + ci.getZenodocommunity()); + } return (R) ri; } catch (final Exception e) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java index a26f85bd7..147f5478b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkCollectAndSave.java @@ -16,7 +16,7 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; -import eu.dnetlib.dhp.schema.dump.oaf.Result; +import eu.dnetlib.dhp.schema.dump.oaf.graph.GraphResult; import eu.dnetlib.dhp.schema.dump.oaf.graph.Relation; /** @@ -69,10 +69,10 @@ public class SparkCollectAndSave implements Serializable { private static void run(SparkSession spark, String inputPath, String outputPath, boolean aggregate) { if (aggregate) { Utils - .readPath(spark, inputPath + "/result/publication", Result.class) - .union(Utils.readPath(spark, inputPath + "/result/dataset", Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", Result.class)) - .union(Utils.readPath(spark, inputPath + "/result/software", Result.class)) + .readPath(spark, inputPath + "/result/publication", GraphResult.class) + .union(Utils.readPath(spark, inputPath + "/result/dataset", GraphResult.class)) + .union(Utils.readPath(spark, inputPath + "/result/otherresearchproduct", GraphResult.class)) + .union(Utils.readPath(spark, inputPath + "/result/software", GraphResult.class)) .write() .option("compression", "gzip") .mode(SaveMode.Overwrite) @@ -80,19 +80,19 @@ public class SparkCollectAndSave implements Serializable { } else { write( Utils - .readPath(spark, inputPath + "/result/publication", Result.class), + .readPath(spark, inputPath + "/result/publication", GraphResult.class), outputPath + "/publication"); write( Utils - .readPath(spark, inputPath + "/result/dataset", Result.class), + .readPath(spark, inputPath + "/result/dataset", GraphResult.class), outputPath + "/dataset"); write( Utils - .readPath(spark, inputPath + "/result/otherresearchproduct", Result.class), + .readPath(spark, inputPath + "/result/otherresearchproduct", GraphResult.class), outputPath + "/otheresearchproduct"); write( Utils - .readPath(spark, inputPath + "/result/software", Result.class), + .readPath(spark, inputPath + "/result/software", GraphResult.class), outputPath + "/software"); } @@ -112,7 +112,7 @@ public class SparkCollectAndSave implements Serializable { } - private static void write(Dataset dataSet, String outputPath) { + private static void write(Dataset dataSet, String outputPath) { dataSet .write() .option("compression", "gzip") diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java index e597bc2f6..6a01c0061 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/graph/SparkOrganizationRelation.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.graph.dump.Utils; +import eu.dnetlib.dhp.oa.graph.dump.community.CommunityMap; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.dump.oaf.Provenance; @@ -59,6 +60,9 @@ public class SparkOrganizationRelation implements Serializable { .fromJson(parser.get("organizationCommunityMap"), OrganizationMap.class); log.info("organization map : {}", new Gson().toJson(organizationMap)); + final String communityMapPath = parser.get("communityMapPath"); + log.info("communityMapPath: {} ", communityMapPath); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -66,14 +70,17 @@ public class SparkOrganizationRelation implements Serializable { isSparkSessionManaged, spark -> { Utils.removeOutputDir(spark, outputPath); - extractRelation(spark, inputPath, organizationMap, outputPath); + extractRelation(spark, inputPath, organizationMap, outputPath, communityMapPath); }); } private static void extractRelation(SparkSession spark, String inputPath, OrganizationMap organizationMap, - String outputPath) { + String outputPath, String communityMapPath) { + + CommunityMap communityMap = Utils.getCommunityMap(spark, communityMapPath); + Dataset relationDataset = Utils.readPath(spark, inputPath, Relation.class); relationDataset.createOrReplaceTempView("relation"); @@ -97,32 +104,43 @@ public class SparkOrganizationRelation implements Serializable { }, Encoders.bean(MergedRels.class)) .filter(Objects::nonNull) .collectAsList() - .forEach(getMergedRelsConsumer(organizationMap, relList)); + .forEach(getMergedRelsConsumer(organizationMap, relList, communityMap)); organizationMap .keySet() .forEach( oId -> organizationMap .get(oId) - .forEach(community -> addRelations(relList, community, oId))); + .forEach(community -> { + if (communityMap.containsKey(community)) { + addRelations(relList, community, oId); + } + })); + // if (relList.size() > 0) { spark .createDataset(relList, Encoders.bean(eu.dnetlib.dhp.schema.dump.oaf.graph.Relation.class)) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); + // } } @NotNull private static Consumer getMergedRelsConsumer(OrganizationMap organizationMap, - List relList) { + List relList, CommunityMap communityMap) { return mergedRels -> { String oId = mergedRels.getOrganizationId(); organizationMap .get(oId) - .forEach(community -> addRelations(relList, community, mergedRels.getRepresentativeId())); + .forEach(community -> { + if (communityMap.containsKey(community)) { + addRelations(relList, community, mergedRels.getRepresentativeId()); + } + + }); organizationMap.remove(oId); }; } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymapservices.json b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymapservices.json new file mode 100644 index 000000000..905cdabe9 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/test/resources/eu/dnetlib/dhp/oa/graph/dump/communityMapPath/communitymapservices.json @@ -0,0 +1 @@ +{"ee":"SDSN - Greece","epos":"EPOS","enrmaps":"Energy Research","fet-h2020":"FET H2020","instruct":"Instruct-Eric","egi":"EGI Federation","euromarine":"Euromarine","covid-19":"COVID-19","dariah":"DARIAH EU","rda":"Research Data Alliance","clarin":"CLARIN","aginfra":"Agricultural and Food Sciences","risis":"RISI","fam":"Fisheries and Aquaculture Management","beopen":"Transport Research","elixir-gr":"ELIXIR GR","fet-fp7":"FET FP7","ifremer":"Ifremer","science-innovation-policy":"Science and Innovation Policy Studies","mes":"European Marine Scinece","oa-pg":"EC Post-Grant Open Access Pilot","ni":"Neuroinformatics","dh-ch":"Digital Humanities and Cultural Heritage"} \ No newline at end of file